mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
550 lines
14 KiB
550 lines
14 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package retrieval |
|
|
|
import ( |
|
"fmt" |
|
"strings" |
|
"sync" |
|
|
|
"github.com/prometheus/common/log" |
|
"github.com/prometheus/common/model" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/retrieval/discovery" |
|
"github.com/prometheus/prometheus/storage" |
|
) |
|
|
|
// A TargetProvider provides information about target groups. It maintains a set |
|
// of sources from which TargetGroups can originate. Whenever a target provider |
|
// detects a potential change, it sends the TargetGroup through its provided channel. |
|
// |
|
// The TargetProvider does not have to guarantee that an actual change happened. |
|
// It does guarantee that it sends the new TargetGroup whenever a change happens. |
|
// |
|
// Sources() is guaranteed to be called exactly once before each call to Run(). |
|
// On a call to Run() implementing types must send a valid target group for each of |
|
// the sources they declared in the last call to Sources(). |
|
type TargetProvider interface { |
|
// Sources returns the source identifiers the provider is currently aware of. |
|
Sources() []string |
|
// Run hands a channel to the target provider through which it can send |
|
// updated target groups. The channel must be closed by the target provider |
|
// if no more updates will be sent. |
|
// On receiving from done Run must return. |
|
Run(up chan<- config.TargetGroup, done <-chan struct{}) |
|
} |
|
|
|
// TargetManager maintains a set of targets, starts and stops their scraping and |
|
// creates the new targets based on the target groups it receives from various |
|
// target providers. |
|
type TargetManager struct { |
|
mtx sync.RWMutex |
|
sampleAppender storage.SampleAppender |
|
running bool |
|
done chan struct{} |
|
|
|
// Targets by their source ID. |
|
targets map[string][]*Target |
|
// Providers by the scrape configs they are derived from. |
|
providers map[*config.ScrapeConfig][]TargetProvider |
|
} |
|
|
|
// NewTargetManager creates a new TargetManager. |
|
func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { |
|
tm := &TargetManager{ |
|
sampleAppender: sampleAppender, |
|
targets: map[string][]*Target{}, |
|
} |
|
return tm |
|
} |
|
|
|
// merge multiple target group channels into a single output channel. |
|
func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate { |
|
var wg sync.WaitGroup |
|
out := make(chan targetGroupUpdate) |
|
|
|
// Start an output goroutine for each input channel in cs. output |
|
// copies values from c to out until c or done is closed, then calls |
|
// wg.Done. |
|
redir := func(c <-chan targetGroupUpdate) { |
|
defer wg.Done() |
|
for n := range c { |
|
select { |
|
case out <- n: |
|
case <-done: |
|
return |
|
} |
|
} |
|
} |
|
|
|
wg.Add(len(cs)) |
|
for _, c := range cs { |
|
go redir(c) |
|
} |
|
|
|
// Close the out channel if all inbound channels are closed. |
|
go func() { |
|
wg.Wait() |
|
close(out) |
|
}() |
|
return out |
|
} |
|
|
|
// targetGroupUpdate is a potentially changed/new target group |
|
// for the given scrape configuration. |
|
type targetGroupUpdate struct { |
|
tg config.TargetGroup |
|
scfg *config.ScrapeConfig |
|
} |
|
|
|
// Run starts background processing to handle target updates. |
|
func (tm *TargetManager) Run() { |
|
log.Info("Starting target manager...") |
|
|
|
tm.done = make(chan struct{}) |
|
|
|
sources := map[string]struct{}{} |
|
updates := []<-chan targetGroupUpdate{} |
|
|
|
for scfg, provs := range tm.providers { |
|
for _, prov := range provs { |
|
// Get an initial set of available sources so we don't remove |
|
// target groups from the last run that are still available. |
|
for _, src := range prov.Sources() { |
|
sources[src] = struct{}{} |
|
} |
|
|
|
tgc := make(chan config.TargetGroup) |
|
// Run the target provider after cleanup of the stale targets is done. |
|
defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { |
|
go prov.Run(tgc, done) |
|
}(prov, tgc, tm.done) |
|
|
|
tgupc := make(chan targetGroupUpdate) |
|
updates = append(updates, tgupc) |
|
|
|
go func(scfg *config.ScrapeConfig, done <-chan struct{}) { |
|
defer close(tgupc) |
|
for { |
|
select { |
|
case tg := <-tgc: |
|
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} |
|
case <-done: |
|
return |
|
} |
|
} |
|
}(scfg, tm.done) |
|
} |
|
} |
|
|
|
// Merge all channels of incoming target group updates into a single |
|
// one and keep applying the updates. |
|
go tm.handleUpdates(merge(tm.done, updates...), tm.done) |
|
|
|
tm.mtx.Lock() |
|
defer tm.mtx.Unlock() |
|
|
|
// Remove old target groups that are no longer in the set of sources. |
|
tm.removeTargets(func(src string) bool { |
|
if _, ok := sources[src]; ok { |
|
return false |
|
} |
|
return true |
|
}) |
|
|
|
tm.running = true |
|
} |
|
|
|
// handleUpdates receives target group updates and handles them in the |
|
// context of the given job config. |
|
func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) { |
|
for { |
|
select { |
|
case update, ok := <-ch: |
|
if !ok { |
|
return |
|
} |
|
log.Debugf("Received potential update for target group %q", update.tg.Source) |
|
|
|
if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { |
|
log.Errorf("Error updating targets: %s", err) |
|
} |
|
case <-done: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// Stop all background processing. |
|
func (tm *TargetManager) Stop() { |
|
tm.mtx.RLock() |
|
if tm.running { |
|
defer tm.stop(true) |
|
} |
|
// Return the lock before calling tm.stop(). |
|
defer tm.mtx.RUnlock() |
|
} |
|
|
|
// stop background processing of the target manager. If removeTargets is true, |
|
// existing targets will be stopped and removed. |
|
func (tm *TargetManager) stop(removeTargets bool) { |
|
log.Info("Stopping target manager...") |
|
defer log.Info("Target manager stopped.") |
|
|
|
close(tm.done) |
|
|
|
tm.mtx.Lock() |
|
defer tm.mtx.Unlock() |
|
|
|
if removeTargets { |
|
tm.removeTargets(nil) |
|
} |
|
|
|
tm.running = false |
|
} |
|
|
|
// removeTargets stops and removes targets for sources where f(source) is true |
|
// or if f is nil. This method is not thread-safe. |
|
func (tm *TargetManager) removeTargets(f func(string) bool) { |
|
if f == nil { |
|
f = func(string) bool { return true } |
|
} |
|
var wg sync.WaitGroup |
|
for src, targets := range tm.targets { |
|
if !f(src) { |
|
continue |
|
} |
|
wg.Add(len(targets)) |
|
for _, target := range targets { |
|
go func(t *Target) { |
|
t.StopScraper() |
|
wg.Done() |
|
}(target) |
|
} |
|
delete(tm.targets, src) |
|
} |
|
wg.Wait() |
|
} |
|
|
|
// updateTargetGroup creates new targets for the group and replaces the old targets |
|
// for the source ID. |
|
func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *config.ScrapeConfig) error { |
|
newTargets, err := tm.targetsFromGroup(tgroup, cfg) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
tm.mtx.Lock() |
|
defer tm.mtx.Unlock() |
|
|
|
if !tm.running { |
|
return nil |
|
} |
|
|
|
oldTargets, ok := tm.targets[tgroup.Source] |
|
if ok { |
|
var wg sync.WaitGroup |
|
// Replace the old targets with the new ones while keeping the state |
|
// of intersecting targets. |
|
for i, tnew := range newTargets { |
|
var match *Target |
|
for j, told := range oldTargets { |
|
if told == nil { |
|
continue |
|
} |
|
if tnew.InstanceIdentifier() == told.InstanceIdentifier() { |
|
match = told |
|
oldTargets[j] = nil |
|
break |
|
} |
|
} |
|
// Update the existing target and discard the new equivalent. |
|
// Otherwise start scraping the new target. |
|
if match != nil { |
|
// Updating is blocked during a scrape. We don't want those wait times |
|
// to build up. |
|
wg.Add(1) |
|
go func(t *Target) { |
|
match.Update(cfg, t.fullLabels(), t.metaLabels) |
|
wg.Done() |
|
}(tnew) |
|
newTargets[i] = match |
|
} else { |
|
go tnew.RunScraper(tm.sampleAppender) |
|
} |
|
} |
|
// Remove all old targets that disappeared. |
|
for _, told := range oldTargets { |
|
if told != nil { |
|
wg.Add(1) |
|
go func(t *Target) { |
|
t.StopScraper() |
|
wg.Done() |
|
}(told) |
|
} |
|
} |
|
wg.Wait() |
|
} else { |
|
// The source ID is new, start all target scrapers. |
|
for _, tnew := range newTargets { |
|
go tnew.RunScraper(tm.sampleAppender) |
|
} |
|
} |
|
|
|
if len(newTargets) > 0 { |
|
tm.targets[tgroup.Source] = newTargets |
|
} else { |
|
delete(tm.targets, tgroup.Source) |
|
} |
|
return nil |
|
} |
|
|
|
// Pools returns the targets currently being scraped bucketed by their job name. |
|
func (tm *TargetManager) Pools() map[string][]*Target { |
|
tm.mtx.RLock() |
|
defer tm.mtx.RUnlock() |
|
|
|
pools := map[string][]*Target{} |
|
|
|
for _, ts := range tm.targets { |
|
for _, t := range ts { |
|
job := string(t.BaseLabels()[model.JobLabel]) |
|
pools[job] = append(pools[job], t) |
|
} |
|
} |
|
return pools |
|
} |
|
|
|
// ApplyConfig resets the manager's target providers and job configurations as defined |
|
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. |
|
// Returns true on success. |
|
func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { |
|
tm.mtx.RLock() |
|
running := tm.running |
|
tm.mtx.RUnlock() |
|
|
|
if running { |
|
tm.stop(false) |
|
// Even if updating the config failed, we want to continue rather than stop scraping anything. |
|
defer tm.Run() |
|
} |
|
providers := map[*config.ScrapeConfig][]TargetProvider{} |
|
|
|
for _, scfg := range cfg.ScrapeConfigs { |
|
providers[scfg] = providersFromConfig(scfg) |
|
} |
|
|
|
tm.mtx.Lock() |
|
defer tm.mtx.Unlock() |
|
|
|
tm.providers = providers |
|
return true |
|
} |
|
|
|
// prefixedTargetProvider wraps TargetProvider and prefixes source strings |
|
// to make the sources unique across a configuration. |
|
type prefixedTargetProvider struct { |
|
TargetProvider |
|
|
|
job string |
|
mechanism string |
|
idx int |
|
} |
|
|
|
func (tp *prefixedTargetProvider) prefix(src string) string { |
|
return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) |
|
} |
|
|
|
func (tp *prefixedTargetProvider) Sources() []string { |
|
srcs := tp.TargetProvider.Sources() |
|
for i, src := range srcs { |
|
srcs[i] = tp.prefix(src) |
|
} |
|
|
|
return srcs |
|
} |
|
|
|
func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { |
|
defer close(ch) |
|
|
|
ch2 := make(chan config.TargetGroup) |
|
go tp.TargetProvider.Run(ch2, done) |
|
|
|
for { |
|
select { |
|
case <-done: |
|
return |
|
case tg := <-ch2: |
|
tg.Source = tp.prefix(tg.Source) |
|
ch <- tg |
|
} |
|
} |
|
} |
|
|
|
// providersFromConfig returns all TargetProviders configured in cfg. |
|
func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { |
|
var providers []TargetProvider |
|
|
|
app := func(mech string, i int, tp TargetProvider) { |
|
providers = append(providers, &prefixedTargetProvider{ |
|
job: cfg.JobName, |
|
mechanism: mech, |
|
idx: i, |
|
TargetProvider: tp, |
|
}) |
|
} |
|
|
|
for i, c := range cfg.DNSSDConfigs { |
|
app("dns", i, discovery.NewDNSDiscovery(c)) |
|
} |
|
for i, c := range cfg.FileSDConfigs { |
|
app("file", i, discovery.NewFileDiscovery(c)) |
|
} |
|
for i, c := range cfg.ConsulSDConfigs { |
|
app("consul", i, discovery.NewConsulDiscovery(c)) |
|
} |
|
for i, c := range cfg.MarathonSDConfigs { |
|
app("marathon", i, discovery.NewMarathonDiscovery(c)) |
|
} |
|
for i, c := range cfg.KubernetesSDConfigs { |
|
k, err := discovery.NewKubernetesDiscovery(c) |
|
if err != nil { |
|
log.Errorf("Cannot create Kubernetes discovery: %s", err) |
|
continue |
|
} |
|
app("kubernetes", i, k) |
|
} |
|
for i, c := range cfg.ServersetSDConfigs { |
|
app("serverset", i, discovery.NewServersetDiscovery(c)) |
|
} |
|
for i, c := range cfg.EC2SDConfigs { |
|
app("ec2", i, discovery.NewEC2Discovery(c)) |
|
} |
|
if len(cfg.TargetGroups) > 0 { |
|
app("static", 0, NewStaticProvider(cfg.TargetGroups)) |
|
} |
|
|
|
return providers |
|
} |
|
|
|
// targetsFromGroup builds targets based on the given TargetGroup and config. |
|
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { |
|
tm.mtx.RLock() |
|
defer tm.mtx.RUnlock() |
|
|
|
targets := make([]*Target, 0, len(tg.Targets)) |
|
for i, labels := range tg.Targets { |
|
addr := string(labels[model.AddressLabel]) |
|
// If no port was provided, infer it based on the used scheme. |
|
if !strings.Contains(addr, ":") { |
|
switch cfg.Scheme { |
|
case "http": |
|
addr = fmt.Sprintf("%s:80", addr) |
|
case "https": |
|
addr = fmt.Sprintf("%s:443", addr) |
|
default: |
|
panic(fmt.Errorf("targetsFromGroup: invalid scheme %q", cfg.Scheme)) |
|
} |
|
labels[model.AddressLabel] = model.LabelValue(addr) |
|
} |
|
for k, v := range cfg.Params { |
|
if len(v) > 0 { |
|
labels[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) |
|
} |
|
} |
|
// Copy labels into the labelset for the target if they are not |
|
// set already. Apply the labelsets in order of decreasing precedence. |
|
labelsets := []model.LabelSet{ |
|
tg.Labels, |
|
{ |
|
model.SchemeLabel: model.LabelValue(cfg.Scheme), |
|
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), |
|
model.JobLabel: model.LabelValue(cfg.JobName), |
|
}, |
|
} |
|
for _, lset := range labelsets { |
|
for ln, lv := range lset { |
|
if _, ok := labels[ln]; !ok { |
|
labels[ln] = lv |
|
} |
|
} |
|
} |
|
|
|
if _, ok := labels[model.AddressLabel]; !ok { |
|
return nil, fmt.Errorf("instance %d in target group %s has no address", i, tg) |
|
} |
|
|
|
preRelabelLabels := labels |
|
|
|
labels, err := Relabel(labels, cfg.RelabelConfigs...) |
|
if err != nil { |
|
return nil, fmt.Errorf("error while relabeling instance %d in target group %s: %s", i, tg, err) |
|
} |
|
// Check if the target was dropped. |
|
if labels == nil { |
|
continue |
|
} |
|
|
|
for ln := range labels { |
|
// Meta labels are deleted after relabelling. Other internal labels propagate to |
|
// the target which decides whether they will be part of their label set. |
|
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { |
|
delete(labels, ln) |
|
} |
|
} |
|
tr := NewTarget(cfg, labels, preRelabelLabels) |
|
targets = append(targets, tr) |
|
} |
|
|
|
return targets, nil |
|
} |
|
|
|
// StaticProvider holds a list of target groups that never change. |
|
type StaticProvider struct { |
|
TargetGroups []*config.TargetGroup |
|
} |
|
|
|
// NewStaticProvider returns a StaticProvider configured with the given |
|
// target groups. |
|
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { |
|
for i, tg := range groups { |
|
tg.Source = fmt.Sprintf("%d", i) |
|
} |
|
return &StaticProvider{ |
|
TargetGroups: groups, |
|
} |
|
} |
|
|
|
// Run implements the TargetProvider interface. |
|
func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { |
|
defer close(ch) |
|
|
|
for _, tg := range sd.TargetGroups { |
|
select { |
|
case <-done: |
|
return |
|
case ch <- *tg: |
|
} |
|
} |
|
<-done |
|
} |
|
|
|
// Sources returns the provider's sources. |
|
func (sd *StaticProvider) Sources() (srcs []string) { |
|
for _, tg := range sd.TargetGroups { |
|
srcs = append(srcs, tg.Source) |
|
} |
|
return srcs |
|
}
|
|
|