mirror of https://github.com/prometheus/prometheus
Merge pull request #11448 from codesome/gateproto
Gate protobuf scraping and update help text for enable-featurepull/11452/head
commit
8d045058c8
|
@ -198,6 +198,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
||||||
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
|
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
|
||||||
case "native-histograms":
|
case "native-histograms":
|
||||||
c.tsdb.EnableNativeHistograms = true
|
c.tsdb.EnableNativeHistograms = true
|
||||||
|
c.scrape.EnableProtobufNegotiation = true
|
||||||
level.Info(logger).Log("msg", "Experimental native histogram support enabled.")
|
level.Info(logger).Log("msg", "Experimental native histogram support enabled.")
|
||||||
case "":
|
case "":
|
||||||
continue
|
continue
|
||||||
|
@ -400,7 +401,7 @@ func main() {
|
||||||
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
||||||
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
||||||
|
|
||||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||||
Default("").StringsVar(&cfg.featureList)
|
Default("").StringsVar(&cfg.featureList)
|
||||||
|
|
||||||
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
||||||
|
|
|
@ -132,6 +132,9 @@ type Options struct {
|
||||||
// Option to enable the experimental in-memory metadata storage and append
|
// Option to enable the experimental in-memory metadata storage and append
|
||||||
// metadata to the WAL.
|
// metadata to the WAL.
|
||||||
EnableMetadataStorage bool
|
EnableMetadataStorage bool
|
||||||
|
// Option to enable protobuf negotiation with the client. Note that the client can already
|
||||||
|
// send protobuf without needing to enable this.
|
||||||
|
EnableProtobufNegotiation bool
|
||||||
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
||||||
DiscoveryReloadInterval model.Duration
|
DiscoveryReloadInterval model.Duration
|
||||||
|
|
||||||
|
|
|
@ -243,6 +243,8 @@ type scrapePool struct {
|
||||||
newLoop func(scrapeLoopOptions) loop
|
newLoop func(scrapeLoopOptions) loop
|
||||||
|
|
||||||
noDefaultPort bool
|
noDefaultPort bool
|
||||||
|
|
||||||
|
enableProtobufNegotiation bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type labelLimits struct {
|
type labelLimits struct {
|
||||||
|
@ -284,15 +286,16 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
sp := &scrapePool{
|
sp := &scrapePool{
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
appendable: app,
|
appendable: app,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
client: client,
|
client: client,
|
||||||
activeTargets: map[uint64]*Target{},
|
activeTargets: map[uint64]*Target{},
|
||||||
loops: map[uint64]loop{},
|
loops: map[uint64]loop{},
|
||||||
logger: logger,
|
logger: logger,
|
||||||
httpOpts: options.HTTPClientOptions,
|
httpOpts: options.HTTPClientOptions,
|
||||||
noDefaultPort: options.NoDefaultPort,
|
noDefaultPort: options.NoDefaultPort,
|
||||||
|
enableProtobufNegotiation: options.EnableProtobufNegotiation,
|
||||||
}
|
}
|
||||||
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
||||||
// Update the targets retrieval function for metadata to a new scrape cache.
|
// Update the targets retrieval function for metadata to a new scrape cache.
|
||||||
|
@ -433,8 +436,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
|
|
||||||
t := sp.activeTargets[fp]
|
t := sp.activeTargets[fp]
|
||||||
interval, timeout, err := t.intervalAndTimeout(interval, timeout)
|
interval, timeout, err := t.intervalAndTimeout(interval, timeout)
|
||||||
|
acceptHeader := scrapeAcceptHeader
|
||||||
|
if sp.enableProtobufNegotiation {
|
||||||
|
acceptHeader = scrapeAcceptHeaderWithProtobuf
|
||||||
|
}
|
||||||
var (
|
var (
|
||||||
s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
|
s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
|
||||||
newLoop = sp.newLoop(scrapeLoopOptions{
|
newLoop = sp.newLoop(scrapeLoopOptions{
|
||||||
target: t,
|
target: t,
|
||||||
scraper: s,
|
scraper: s,
|
||||||
|
@ -537,8 +544,11 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
// for every target.
|
// for every target.
|
||||||
var err error
|
var err error
|
||||||
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
|
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
|
||||||
|
acceptHeader := scrapeAcceptHeader
|
||||||
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
|
if sp.enableProtobufNegotiation {
|
||||||
|
acceptHeader = scrapeAcceptHeaderWithProtobuf
|
||||||
|
}
|
||||||
|
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
|
||||||
l := sp.newLoop(scrapeLoopOptions{
|
l := sp.newLoop(scrapeLoopOptions{
|
||||||
target: t,
|
target: t,
|
||||||
scraper: s,
|
scraper: s,
|
||||||
|
@ -757,11 +767,15 @@ type targetScraper struct {
|
||||||
buf *bufio.Reader
|
buf *bufio.Reader
|
||||||
|
|
||||||
bodySizeLimit int64
|
bodySizeLimit int64
|
||||||
|
acceptHeader string
|
||||||
}
|
}
|
||||||
|
|
||||||
var errBodySizeLimit = errors.New("body size limit exceeded")
|
var errBodySizeLimit = errors.New("body size limit exceeded")
|
||||||
|
|
||||||
const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
|
const (
|
||||||
|
scrapeAcceptHeader = `encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
|
||||||
|
scrapeAcceptHeaderWithProtobuf = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
|
||||||
|
)
|
||||||
|
|
||||||
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||||
|
|
||||||
|
@ -771,7 +785,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
req.Header.Add("Accept", acceptHeader)
|
req.Header.Add("Accept", s.acceptHeader)
|
||||||
req.Header.Add("Accept-Encoding", "gzip")
|
req.Header.Add("Accept-Encoding", "gzip")
|
||||||
req.Header.Set("User-Agent", UserAgent)
|
req.Header.Set("User-Agent", UserAgent)
|
||||||
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))
|
||||||
|
|
|
@ -2147,11 +2147,15 @@ func TestTargetScraperScrapeOK(t *testing.T) {
|
||||||
expectedTimeout = "1.5"
|
expectedTimeout = "1.5"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var protobufParsing bool
|
||||||
|
|
||||||
server := httptest.NewServer(
|
server := httptest.NewServer(
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
accept := r.Header.Get("Accept")
|
if protobufParsing {
|
||||||
if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") {
|
accept := r.Header.Get("Accept")
|
||||||
t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept)
|
if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") {
|
||||||
|
t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
|
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
|
||||||
|
@ -2170,22 +2174,29 @@ func TestTargetScraperScrapeOK(t *testing.T) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := &targetScraper{
|
runTest := func(acceptHeader string) {
|
||||||
Target: &Target{
|
ts := &targetScraper{
|
||||||
labels: labels.FromStrings(
|
Target: &Target{
|
||||||
model.SchemeLabel, serverURL.Scheme,
|
labels: labels.FromStrings(
|
||||||
model.AddressLabel, serverURL.Host,
|
model.SchemeLabel, serverURL.Scheme,
|
||||||
),
|
model.AddressLabel, serverURL.Host,
|
||||||
},
|
),
|
||||||
client: http.DefaultClient,
|
},
|
||||||
timeout: configTimeout,
|
client: http.DefaultClient,
|
||||||
}
|
timeout: configTimeout,
|
||||||
var buf bytes.Buffer
|
acceptHeader: acceptHeader,
|
||||||
|
}
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
contentType, err := ts.scrape(context.Background(), &buf)
|
contentType, err := ts.scrape(context.Background(), &buf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "text/plain; version=0.0.4", contentType)
|
require.Equal(t, "text/plain; version=0.0.4", contentType)
|
||||||
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
|
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
runTest(scrapeAcceptHeader)
|
||||||
|
protobufParsing = true
|
||||||
|
runTest(scrapeAcceptHeaderWithProtobuf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetScrapeScrapeCancel(t *testing.T) {
|
func TestTargetScrapeScrapeCancel(t *testing.T) {
|
||||||
|
@ -2210,7 +2221,8 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
|
||||||
model.AddressLabel, serverURL.Host,
|
model.AddressLabel, serverURL.Host,
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
client: http.DefaultClient,
|
client: http.DefaultClient,
|
||||||
|
acceptHeader: scrapeAcceptHeader,
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
@ -2263,7 +2275,8 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
|
||||||
model.AddressLabel, serverURL.Host,
|
model.AddressLabel, serverURL.Host,
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
client: http.DefaultClient,
|
client: http.DefaultClient,
|
||||||
|
acceptHeader: scrapeAcceptHeader,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = ts.scrape(context.Background(), io.Discard)
|
_, err = ts.scrape(context.Background(), io.Discard)
|
||||||
|
@ -2305,6 +2318,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
|
||||||
},
|
},
|
||||||
client: http.DefaultClient,
|
client: http.DefaultClient,
|
||||||
bodySizeLimit: bodySizeLimit,
|
bodySizeLimit: bodySizeLimit,
|
||||||
|
acceptHeader: scrapeAcceptHeader,
|
||||||
}
|
}
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue