mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
11 KiB
372 lines
11 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package scrape |
|
|
|
import ( |
|
"errors" |
|
"fmt" |
|
"hash/fnv" |
|
"log/slog" |
|
"reflect" |
|
"sync" |
|
"time" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
config_util "github.com/prometheus/common/config" |
|
"github.com/prometheus/common/model" |
|
"github.com/prometheus/common/promslog" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/discovery/targetgroup" |
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/util/logging" |
|
"github.com/prometheus/prometheus/util/osutil" |
|
"github.com/prometheus/prometheus/util/pool" |
|
) |
|
|
|
// NewManager is the Manager constructor. |
|
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) { |
|
if o == nil { |
|
o = &Options{} |
|
} |
|
if logger == nil { |
|
logger = promslog.NewNopLogger() |
|
} |
|
|
|
sm, err := newScrapeMetrics(registerer) |
|
if err != nil { |
|
return nil, fmt.Errorf("failed to create scrape manager due to error: %w", err) |
|
} |
|
|
|
m := &Manager{ |
|
append: app, |
|
opts: o, |
|
logger: logger, |
|
newScrapeFailureLogger: newScrapeFailureLogger, |
|
scrapeConfigs: make(map[string]*config.ScrapeConfig), |
|
scrapePools: make(map[string]*scrapePool), |
|
graceShut: make(chan struct{}), |
|
triggerReload: make(chan struct{}, 1), |
|
metrics: sm, |
|
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }), |
|
} |
|
|
|
m.metrics.setTargetMetadataCacheGatherer(m) |
|
|
|
return m, nil |
|
} |
|
|
|
// Options are the configuration parameters to the scrape manager. |
|
type Options struct { |
|
ExtraMetrics bool |
|
// Option used by downstream scraper users like OpenTelemetry Collector |
|
// to help lookup metric metadata. Should be false for Prometheus. |
|
PassMetadataInContext bool |
|
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders |
|
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata |
|
// can be written to the WAL and thus read for remote write. |
|
// TODO: implement some form of metadata storage |
|
AppendMetadata bool |
|
// Option to increase the interval used by scrape manager to throttle target groups updates. |
|
DiscoveryReloadInterval model.Duration |
|
// Option to enable the ingestion of the created timestamp as a synthetic zero sample. |
|
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md |
|
EnableCreatedTimestampZeroIngestion bool |
|
// Option to enable the ingestion of native histograms. |
|
EnableNativeHistogramsIngestion bool |
|
|
|
// Optional HTTP client options to use when scraping. |
|
HTTPClientOptions []config_util.HTTPClientOption |
|
|
|
// private option for testability. |
|
skipOffsetting bool |
|
} |
|
|
|
// Manager maintains a set of scrape pools and manages start/stop cycles |
|
// when receiving new target groups from the discovery manager. |
|
type Manager struct { |
|
opts *Options |
|
logger *slog.Logger |
|
append storage.Appendable |
|
graceShut chan struct{} |
|
|
|
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup. |
|
mtxScrape sync.Mutex // Guards the fields below. |
|
scrapeConfigs map[string]*config.ScrapeConfig |
|
scrapePools map[string]*scrapePool |
|
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error) |
|
scrapeFailureLoggers map[string]*logging.JSONFileLogger |
|
targetSets map[string][]*targetgroup.Group |
|
buffers *pool.Pool |
|
|
|
triggerReload chan struct{} |
|
|
|
metrics *scrapeMetrics |
|
} |
|
|
|
// Run receives and saves target set updates and triggers the scraping loops reloading. |
|
// Reloading happens in the background so that it doesn't block receiving targets updates. |
|
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error { |
|
go m.reloader() |
|
for { |
|
select { |
|
case ts := <-tsets: |
|
m.updateTsets(ts) |
|
|
|
select { |
|
case m.triggerReload <- struct{}{}: |
|
default: |
|
} |
|
|
|
case <-m.graceShut: |
|
return nil |
|
} |
|
} |
|
} |
|
|
|
// UnregisterMetrics unregisters manager metrics. |
|
func (m *Manager) UnregisterMetrics() { |
|
m.metrics.Unregister() |
|
} |
|
|
|
func (m *Manager) reloader() { |
|
reloadIntervalDuration := m.opts.DiscoveryReloadInterval |
|
if reloadIntervalDuration == model.Duration(0) { |
|
reloadIntervalDuration = model.Duration(5 * time.Second) |
|
} |
|
|
|
ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) |
|
|
|
defer ticker.Stop() |
|
|
|
for { |
|
select { |
|
case <-m.graceShut: |
|
return |
|
case <-ticker.C: |
|
select { |
|
case <-m.triggerReload: |
|
m.reload() |
|
case <-m.graceShut: |
|
return |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (m *Manager) reload() { |
|
m.mtxScrape.Lock() |
|
var wg sync.WaitGroup |
|
for setName, groups := range m.targetSets { |
|
if _, ok := m.scrapePools[setName]; !ok { |
|
scrapeConfig, ok := m.scrapeConfigs[setName] |
|
if !ok { |
|
m.logger.Error("error reloading target set", "err", "invalid config id:"+setName) |
|
continue |
|
} |
|
if scrapeConfig.ConvertClassicHistogramsToNHCB && m.opts.EnableCreatedTimestampZeroIngestion { |
|
// TODO(krajorama): fix https://github.com/prometheus/prometheus/issues/15137 |
|
m.logger.Error("error reloading target set", "err", "cannot convert classic histograms to native histograms with custom buckets and ingest created timestamp zero samples at the same time due to https://github.com/prometheus/prometheus/issues/15137") |
|
continue |
|
} |
|
m.metrics.targetScrapePools.Inc() |
|
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics) |
|
if err != nil { |
|
m.metrics.targetScrapePoolsFailed.Inc() |
|
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName) |
|
continue |
|
} |
|
m.scrapePools[setName] = sp |
|
if l, ok := m.scrapeFailureLoggers[scrapeConfig.ScrapeFailureLogFile]; ok { |
|
sp.SetScrapeFailureLogger(l) |
|
} else { |
|
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", setName) |
|
} |
|
} |
|
|
|
wg.Add(1) |
|
// Run the sync in parallel as these take a while and at high load can't catch up. |
|
go func(sp *scrapePool, groups []*targetgroup.Group) { |
|
sp.Sync(groups) |
|
wg.Done() |
|
}(m.scrapePools[setName], groups) |
|
} |
|
m.mtxScrape.Unlock() |
|
wg.Wait() |
|
} |
|
|
|
// setOffsetSeed calculates a global offsetSeed per server relying on extra label set. |
|
func (m *Manager) setOffsetSeed(labels labels.Labels) error { |
|
h := fnv.New64a() |
|
hostname, err := osutil.GetFQDN() |
|
if err != nil { |
|
return err |
|
} |
|
if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil { |
|
return err |
|
} |
|
m.offsetSeed = h.Sum64() |
|
return nil |
|
} |
|
|
|
// Stop cancels all running scrape pools and blocks until all have exited. |
|
func (m *Manager) Stop() { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
for _, sp := range m.scrapePools { |
|
sp.stop() |
|
} |
|
close(m.graceShut) |
|
} |
|
|
|
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) { |
|
m.mtxScrape.Lock() |
|
m.targetSets = tsets |
|
m.mtxScrape.Unlock() |
|
} |
|
|
|
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. |
|
func (m *Manager) ApplyConfig(cfg *config.Config) error { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
scfgs, err := cfg.GetScrapeConfigs() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
c := make(map[string]*config.ScrapeConfig) |
|
scrapeFailureLoggers := map[string]*logging.JSONFileLogger{ |
|
"": nil, // Emptying the file name sets the scrape logger to nil. |
|
} |
|
for _, scfg := range scfgs { |
|
c[scfg.JobName] = scfg |
|
if _, ok := scrapeFailureLoggers[scfg.ScrapeFailureLogFile]; !ok { |
|
// We promise to reopen the file on each reload. |
|
var ( |
|
logger *logging.JSONFileLogger |
|
err error |
|
) |
|
if m.newScrapeFailureLogger != nil { |
|
if logger, err = m.newScrapeFailureLogger(scfg.ScrapeFailureLogFile); err != nil { |
|
return err |
|
} |
|
} |
|
scrapeFailureLoggers[scfg.ScrapeFailureLogFile] = logger |
|
} |
|
} |
|
m.scrapeConfigs = c |
|
|
|
oldScrapeFailureLoggers := m.scrapeFailureLoggers |
|
for _, s := range oldScrapeFailureLoggers { |
|
if s != nil { |
|
defer s.Close() |
|
} |
|
} |
|
|
|
m.scrapeFailureLoggers = scrapeFailureLoggers |
|
|
|
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { |
|
return err |
|
} |
|
|
|
// Cleanup and reload pool if the configuration has changed. |
|
var failed bool |
|
for name, sp := range m.scrapePools { |
|
switch cfg, ok := m.scrapeConfigs[name]; { |
|
case !ok: |
|
sp.stop() |
|
delete(m.scrapePools, name) |
|
case !reflect.DeepEqual(sp.config, cfg): |
|
err := sp.reload(cfg) |
|
if err != nil { |
|
m.logger.Error("error reloading scrape pool", "err", err, "scrape_pool", name) |
|
failed = true |
|
} |
|
fallthrough |
|
case ok: |
|
if l, ok := m.scrapeFailureLoggers[cfg.ScrapeFailureLogFile]; ok { |
|
sp.SetScrapeFailureLogger(l) |
|
} else { |
|
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", name) |
|
} |
|
} |
|
} |
|
|
|
if failed { |
|
return errors.New("failed to apply the new configuration") |
|
} |
|
return nil |
|
} |
|
|
|
// TargetsAll returns active and dropped targets grouped by job_name. |
|
func (m *Manager) TargetsAll() map[string][]*Target { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools)) |
|
for tset, sp := range m.scrapePools { |
|
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...) |
|
} |
|
return targets |
|
} |
|
|
|
// ScrapePools returns the list of all scrape pool names. |
|
func (m *Manager) ScrapePools() []string { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
names := make([]string, 0, len(m.scrapePools)) |
|
for name := range m.scrapePools { |
|
names = append(names, name) |
|
} |
|
return names |
|
} |
|
|
|
// TargetsActive returns the active targets currently being scraped. |
|
func (m *Manager) TargetsActive() map[string][]*Target { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools)) |
|
for tset, sp := range m.scrapePools { |
|
targets[tset] = sp.ActiveTargets() |
|
} |
|
return targets |
|
} |
|
|
|
// TargetsDropped returns the dropped targets during relabelling, subject to KeepDroppedTargets limit. |
|
func (m *Manager) TargetsDropped() map[string][]*Target { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools)) |
|
for tset, sp := range m.scrapePools { |
|
targets[tset] = sp.DroppedTargets() |
|
} |
|
return targets |
|
} |
|
|
|
func (m *Manager) TargetsDroppedCounts() map[string]int { |
|
m.mtxScrape.Lock() |
|
defer m.mtxScrape.Unlock() |
|
|
|
counts := make(map[string]int, len(m.scrapePools)) |
|
for tset, sp := range m.scrapePools { |
|
counts[tset] = sp.droppedTargetsCount |
|
} |
|
return counts |
|
}
|
|
|