mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
552 lines
15 KiB
552 lines
15 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package retrieval |
|
|
|
import ( |
|
"fmt" |
|
"net" |
|
"sort" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"github.com/prometheus/common/log" |
|
"github.com/prometheus/common/model" |
|
"golang.org/x/net/context" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/relabel" |
|
"github.com/prometheus/prometheus/retrieval/discovery" |
|
"github.com/prometheus/prometheus/storage" |
|
) |
|
|
|
// A TargetProvider provides information about target groups. It maintains a set |
|
// of sources from which TargetGroups can originate. Whenever a target provider |
|
// detects a potential change, it sends the TargetGroup through its provided channel. |
|
// |
|
// The TargetProvider does not have to guarantee that an actual change happened. |
|
// It does guarantee that it sends the new TargetGroup whenever a change happens. |
|
// |
|
// Providers must initially send all known target groups as soon as it can. |
|
type TargetProvider interface { |
|
// Run hands a channel to the target provider through which it can send |
|
// updated target groups. The channel must be closed by the target provider |
|
// if no more updates will be sent. |
|
// On receiving from done Run must return. |
|
Run(ctx context.Context, up chan<- []*config.TargetGroup) |
|
} |
|
|
|
// TargetManager maintains a set of targets, starts and stops their scraping and |
|
// creates the new targets based on the target groups it receives from various |
|
// target providers. |
|
type TargetManager struct { |
|
appender storage.SampleAppender |
|
scrapeConfigs []*config.ScrapeConfig |
|
|
|
mtx sync.RWMutex |
|
ctx context.Context |
|
cancel func() |
|
wg sync.WaitGroup |
|
|
|
// Set of unqiue targets by scrape configuration. |
|
targetSets map[string]*targetSet |
|
} |
|
|
|
// NewTargetManager creates a new TargetManager. |
|
func NewTargetManager(app storage.SampleAppender) *TargetManager { |
|
return &TargetManager{ |
|
appender: app, |
|
targetSets: map[string]*targetSet{}, |
|
} |
|
} |
|
|
|
// Run starts background processing to handle target updates. |
|
func (tm *TargetManager) Run() { |
|
log.Info("Starting target manager...") |
|
|
|
tm.mtx.Lock() |
|
|
|
tm.ctx, tm.cancel = context.WithCancel(context.Background()) |
|
tm.reload() |
|
|
|
tm.mtx.Unlock() |
|
|
|
tm.wg.Wait() |
|
} |
|
|
|
// Stop all background processing. |
|
func (tm *TargetManager) Stop() { |
|
log.Infoln("Stopping target manager...") |
|
|
|
tm.mtx.Lock() |
|
// Cancel the base context, this will cause all target providers to shut down |
|
// and all in-flight scrapes to abort immmediately. |
|
// Started inserts will be finished before terminating. |
|
tm.cancel() |
|
tm.mtx.Unlock() |
|
|
|
// Wait for all scrape inserts to complete. |
|
tm.wg.Wait() |
|
|
|
log.Debugln("Target manager stopped") |
|
} |
|
|
|
func (tm *TargetManager) reload() { |
|
jobs := map[string]struct{}{} |
|
|
|
// Start new target sets and update existing ones. |
|
for _, scfg := range tm.scrapeConfigs { |
|
jobs[scfg.JobName] = struct{}{} |
|
|
|
ts, ok := tm.targetSets[scfg.JobName] |
|
if !ok { |
|
ts = newTargetSet(scfg, tm.appender) |
|
tm.targetSets[scfg.JobName] = ts |
|
|
|
tm.wg.Add(1) |
|
|
|
go func(ts *targetSet) { |
|
ts.runScraping(tm.ctx) |
|
tm.wg.Done() |
|
}(ts) |
|
} else { |
|
ts.reload(scfg) |
|
} |
|
ts.runProviders(tm.ctx, providersFromConfig(scfg)) |
|
} |
|
|
|
// Remove old target sets. Waiting for stopping is already guaranteed |
|
// by the goroutine that started the target set. |
|
for name, ts := range tm.targetSets { |
|
if _, ok := jobs[name]; !ok { |
|
ts.cancel() |
|
delete(tm.targetSets, name) |
|
} |
|
} |
|
} |
|
|
|
// Pools returns the targets currently being scraped bucketed by their job name. |
|
func (tm *TargetManager) Pools() map[string]Targets { |
|
tm.mtx.RLock() |
|
defer tm.mtx.RUnlock() |
|
|
|
pools := map[string]Targets{} |
|
|
|
// TODO(fabxc): this is just a hack to maintain compatibility for now. |
|
for _, ps := range tm.targetSets { |
|
ps.scrapePool.mtx.RLock() |
|
|
|
for _, t := range ps.scrapePool.targets { |
|
job := string(t.Labels()[model.JobLabel]) |
|
pools[job] = append(pools[job], t) |
|
} |
|
|
|
ps.scrapePool.mtx.RUnlock() |
|
} |
|
for _, targets := range pools { |
|
sort.Sort(targets) |
|
} |
|
return pools |
|
} |
|
|
|
// ApplyConfig resets the manager's target providers and job configurations as defined |
|
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. |
|
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { |
|
tm.mtx.Lock() |
|
defer tm.mtx.Unlock() |
|
|
|
tm.scrapeConfigs = cfg.ScrapeConfigs |
|
|
|
if tm.ctx != nil { |
|
tm.reload() |
|
} |
|
return nil |
|
} |
|
|
|
// targetSet holds several TargetProviders for which the same scrape configuration |
|
// is used. It maintains target groups from all given providers and sync them |
|
// to a scrape pool. |
|
type targetSet struct { |
|
mtx sync.RWMutex |
|
|
|
// Sets of targets by a source string that is unique across target providers. |
|
tgroups map[string][]*Target |
|
|
|
scrapePool *scrapePool |
|
config *config.ScrapeConfig |
|
|
|
syncCh chan struct{} |
|
cancelScraping func() |
|
cancelProviders func() |
|
} |
|
|
|
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { |
|
ts := &targetSet{ |
|
scrapePool: newScrapePool(cfg, app), |
|
syncCh: make(chan struct{}, 1), |
|
config: cfg, |
|
} |
|
return ts |
|
} |
|
|
|
func (ts *targetSet) cancel() { |
|
ts.mtx.RLock() |
|
defer ts.mtx.RUnlock() |
|
|
|
if ts.cancelScraping != nil { |
|
ts.cancelScraping() |
|
} |
|
if ts.cancelProviders != nil { |
|
ts.cancelProviders() |
|
} |
|
} |
|
|
|
func (ts *targetSet) reload(cfg *config.ScrapeConfig) { |
|
ts.mtx.Lock() |
|
ts.config = cfg |
|
ts.mtx.Unlock() |
|
|
|
ts.scrapePool.reload(cfg) |
|
} |
|
|
|
func (ts *targetSet) runScraping(ctx context.Context) { |
|
ctx, ts.cancelScraping = context.WithCancel(ctx) |
|
|
|
ts.scrapePool.init(ctx) |
|
|
|
Loop: |
|
for { |
|
// Throttle syncing to once per five seconds. |
|
select { |
|
case <-ctx.Done(): |
|
break Loop |
|
case <-time.After(5 * time.Second): |
|
} |
|
|
|
select { |
|
case <-ctx.Done(): |
|
break Loop |
|
case <-ts.syncCh: |
|
ts.mtx.RLock() |
|
ts.sync() |
|
ts.mtx.RUnlock() |
|
} |
|
} |
|
|
|
// We want to wait for all pending target scrapes to complete though to ensure there'll |
|
// be no more storage writes after this point. |
|
ts.scrapePool.stop() |
|
} |
|
|
|
func (ts *targetSet) sync() { |
|
var all []*Target |
|
for _, targets := range ts.tgroups { |
|
all = append(all, targets...) |
|
} |
|
ts.scrapePool.sync(all) |
|
} |
|
|
|
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { |
|
// Lock for the entire time. This may mean up to 5 seconds until the full initial set |
|
// is retrieved and applied. |
|
// We could release earlier with some tweaks, but this is easier to reason about. |
|
ts.mtx.Lock() |
|
defer ts.mtx.Unlock() |
|
|
|
var wg sync.WaitGroup |
|
|
|
if ts.cancelProviders != nil { |
|
ts.cancelProviders() |
|
} |
|
ctx, ts.cancelProviders = context.WithCancel(ctx) |
|
|
|
// (Re-)create a fresh tgroups map to not keep stale targets around. We |
|
// will retrieve all targets below anyway, so cleaning up everything is |
|
// safe and doesn't inflict any additional cost. |
|
ts.tgroups = map[string][]*Target{} |
|
|
|
for name, prov := range providers { |
|
wg.Add(1) |
|
|
|
updates := make(chan []*config.TargetGroup) |
|
|
|
go func(name string, prov TargetProvider) { |
|
select { |
|
case <-ctx.Done(): |
|
case initial, ok := <-updates: |
|
// Handle the case that a target provider exits and closes the channel |
|
// before the context is done. |
|
if !ok { |
|
break |
|
} |
|
// First set of all targets the provider knows. |
|
for _, tgroup := range initial { |
|
if tgroup == nil { |
|
continue |
|
} |
|
targets, err := targetsFromGroup(tgroup, ts.config) |
|
if err != nil { |
|
log.With("target_group", tgroup).Errorf("Target update failed: %s", err) |
|
continue |
|
} |
|
ts.tgroups[name+"/"+tgroup.Source] = targets |
|
} |
|
case <-time.After(5 * time.Second): |
|
// Initial set didn't arrive. Act as if it was empty |
|
// and wait for updates later on. |
|
} |
|
|
|
wg.Done() |
|
|
|
// Start listening for further updates. |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
return |
|
case tgs, ok := <-updates: |
|
// Handle the case that a target provider exits and closes the channel |
|
// before the context is done. |
|
if !ok { |
|
return |
|
} |
|
for _, tg := range tgs { |
|
if err := ts.update(name, tg); err != nil { |
|
log.With("target_group", tg).Errorf("Target update failed: %s", err) |
|
} |
|
} |
|
} |
|
} |
|
}(name, prov) |
|
|
|
go prov.Run(ctx, updates) |
|
} |
|
|
|
// We wait for a full initial set of target groups before releasing the mutex |
|
// to ensure the initial sync is complete and there are no races with subsequent updates. |
|
wg.Wait() |
|
// Just signal that there are initial sets to sync now. Actual syncing must only |
|
// happen in the runScraping loop. |
|
select { |
|
case ts.syncCh <- struct{}{}: |
|
default: |
|
} |
|
} |
|
|
|
// update handles a target group update from a target provider identified by the name. |
|
func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { |
|
if tgroup == nil { |
|
return nil |
|
} |
|
targets, err := targetsFromGroup(tgroup, ts.config) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
ts.mtx.Lock() |
|
defer ts.mtx.Unlock() |
|
|
|
ts.tgroups[name+"/"+tgroup.Source] = targets |
|
|
|
select { |
|
case ts.syncCh <- struct{}{}: |
|
default: |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// providersFromConfig returns all TargetProviders configured in cfg. |
|
func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { |
|
providers := map[string]TargetProvider{} |
|
|
|
app := func(mech string, i int, tp TargetProvider) { |
|
providers[fmt.Sprintf("%s/%d", mech, i)] = tp |
|
} |
|
|
|
for i, c := range cfg.DNSSDConfigs { |
|
app("dns", i, discovery.NewDNS(c)) |
|
} |
|
for i, c := range cfg.FileSDConfigs { |
|
app("file", i, discovery.NewFileDiscovery(c)) |
|
} |
|
for i, c := range cfg.ConsulSDConfigs { |
|
k, err := discovery.NewConsul(c) |
|
if err != nil { |
|
log.Errorf("Cannot create Consul discovery: %s", err) |
|
continue |
|
} |
|
app("consul", i, k) |
|
} |
|
for i, c := range cfg.MarathonSDConfigs { |
|
app("marathon", i, discovery.NewMarathon(c)) |
|
} |
|
for i, c := range cfg.KubernetesSDConfigs { |
|
k, err := discovery.NewKubernetesDiscovery(c) |
|
if err != nil { |
|
log.Errorf("Cannot create Kubernetes discovery: %s", err) |
|
continue |
|
} |
|
app("kubernetes", i, k) |
|
} |
|
for i, c := range cfg.ServersetSDConfigs { |
|
app("serverset", i, discovery.NewServersetDiscovery(c)) |
|
} |
|
for i, c := range cfg.NerveSDConfigs { |
|
app("nerve", i, discovery.NewNerveDiscovery(c)) |
|
} |
|
for i, c := range cfg.EC2SDConfigs { |
|
app("ec2", i, discovery.NewEC2Discovery(c)) |
|
} |
|
for i, c := range cfg.GCESDConfigs { |
|
gced, err := discovery.NewGCEDiscovery(c) |
|
if err != nil { |
|
log.Errorf("Cannot initialize GCE discovery: %s", err) |
|
continue |
|
} |
|
app("gce", i, gced) |
|
} |
|
for i, c := range cfg.AzureSDConfigs { |
|
app("azure", i, discovery.NewAzureDiscovery(c)) |
|
} |
|
if len(cfg.StaticConfigs) > 0 { |
|
app("static", 0, NewStaticProvider(cfg.StaticConfigs)) |
|
} |
|
|
|
return providers |
|
} |
|
|
|
// populateLabels builds a label set from the given label set and scrape configuration. |
|
// It returns a label set before relabeling was applied as the second return value. |
|
// Returns a nil label set if the target is dropped during relabeling. |
|
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { |
|
if _, ok := lset[model.AddressLabel]; !ok { |
|
return nil, nil, fmt.Errorf("no address") |
|
} |
|
// Copy labels into the labelset for the target if they are not |
|
// set already. Apply the labelsets in order of decreasing precedence. |
|
scrapeLabels := model.LabelSet{ |
|
model.SchemeLabel: model.LabelValue(cfg.Scheme), |
|
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), |
|
model.JobLabel: model.LabelValue(cfg.JobName), |
|
} |
|
for ln, lv := range scrapeLabels { |
|
if _, ok := lset[ln]; !ok { |
|
lset[ln] = lv |
|
} |
|
} |
|
// Encode scrape query parameters as labels. |
|
for k, v := range cfg.Params { |
|
if len(v) > 0 { |
|
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) |
|
} |
|
} |
|
|
|
preRelabelLabels := lset |
|
lset = relabel.Process(lset, cfg.RelabelConfigs...) |
|
|
|
// Check if the target was dropped. |
|
if lset == nil { |
|
return nil, nil, nil |
|
} |
|
|
|
// addPort checks whether we should add a default port to the address. |
|
// If the address is not valid, we don't append a port either. |
|
addPort := func(s string) bool { |
|
// If we can split, a port exists and we don't have to add one. |
|
if _, _, err := net.SplitHostPort(s); err == nil { |
|
return false |
|
} |
|
// If adding a port makes it valid, the previous error |
|
// was not due to an invalid address and we can append a port. |
|
_, _, err := net.SplitHostPort(s + ":1234") |
|
return err == nil |
|
} |
|
// If it's an address with no trailing port, infer it based on the used scheme. |
|
if addr := string(lset[model.AddressLabel]); addPort(addr) { |
|
// Addresses reaching this point are already wrapped in [] if necessary. |
|
switch lset[model.SchemeLabel] { |
|
case "http", "": |
|
addr = addr + ":80" |
|
case "https": |
|
addr = addr + ":443" |
|
default: |
|
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) |
|
} |
|
lset[model.AddressLabel] = model.LabelValue(addr) |
|
} |
|
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
// Meta labels are deleted after relabelling. Other internal labels propagate to |
|
// the target which decides whether they will be part of their label set. |
|
for ln := range lset { |
|
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { |
|
delete(lset, ln) |
|
} |
|
} |
|
|
|
// Default the instance label to the target address. |
|
if _, ok := lset[model.InstanceLabel]; !ok { |
|
lset[model.InstanceLabel] = lset[model.AddressLabel] |
|
} |
|
return lset, preRelabelLabels, nil |
|
} |
|
|
|
// targetsFromGroup builds targets based on the given TargetGroup and config. |
|
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { |
|
targets := make([]*Target, 0, len(tg.Targets)) |
|
|
|
for i, lset := range tg.Targets { |
|
// Combine target labels with target group labels. |
|
for ln, lv := range tg.Labels { |
|
if _, ok := lset[ln]; !ok { |
|
lset[ln] = lv |
|
} |
|
} |
|
labels, origLabels, err := populateLabels(lset, cfg) |
|
if err != nil { |
|
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) |
|
} |
|
if labels != nil { |
|
targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) |
|
} |
|
} |
|
return targets, nil |
|
} |
|
|
|
// StaticProvider holds a list of target groups that never change. |
|
type StaticProvider struct { |
|
TargetGroups []*config.TargetGroup |
|
} |
|
|
|
// NewStaticProvider returns a StaticProvider configured with the given |
|
// target groups. |
|
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { |
|
for i, tg := range groups { |
|
tg.Source = fmt.Sprintf("%d", i) |
|
} |
|
return &StaticProvider{groups} |
|
} |
|
|
|
// Run implements the TargetProvider interface. |
|
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { |
|
// We still have to consider that the consumer exits right away in which case |
|
// the context will be canceled. |
|
select { |
|
case ch <- sd.TargetGroups: |
|
case <-ctx.Done(): |
|
} |
|
close(ch) |
|
}
|
|
|