retrieval: add honor label handling and parametrized querying.

This commit adds the honor_labels and params arguments to the scrape
config. This allows to specify query parameters used by the scrapers
and handling scraped labels with precedence.
pull/786/head
Fabian Reinartz 2015-06-22 22:35:19 +02:00
parent 9016917d1c
commit dc7d27ab9a
8 changed files with 129 additions and 4 deletions

View File

@ -21,9 +21,9 @@ import (
)
const (
// ExporterLabelPrefix is the label name prefix to prepend if a
// ExportedLabelPrefix is the label name prefix to prepend if a
// synthetic label is already present in the exported metrics.
ExporterLabelPrefix LabelName = "exporter_"
ExportedLabelPrefix LabelName = "exported_"
// MetricNameLabel is the label name indicating the metric name of a
// timeseries.

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"regexp"
"strings"
"time"
@ -62,6 +63,7 @@ var (
// configured globals.
MetricsPath: "/metrics",
Scheme: "http",
HonorLabels: false,
}
// The default Relabel configuration.
@ -190,6 +192,10 @@ func (c *GlobalConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type ScrapeConfig struct {
// The job name to which the job label is set by default.
JobName string `yaml:"job_name"`
// Indicator whether the scraped metrics should remain unmodified.
HonorLabels bool `yaml:"honor_labels,omitempty"`
// A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config.
ScrapeInterval Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.

View File

@ -36,6 +36,7 @@ var expectedConf = &Config{
{
JobName: "prometheus",
HonorLabels: true,
ScrapeInterval: Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,

View File

@ -16,6 +16,7 @@ rule_files:
scrape_configs:
- job_name: prometheus
honor_labels: true
# scrape_interval is defined by the configured global (15s).
# scrape_timeout is defined by the global default (10s).

View File

@ -38,6 +38,11 @@ type collectResultAppender struct {
}
func (a *collectResultAppender) Append(s *clientmodel.Sample) {
for ln, lv := range s.Metric {
if len(lv) == 0 {
delete(s.Metric, ln)
}
}
a.result = append(a.result, s)
}

View File

@ -166,6 +166,9 @@ type Target struct {
deadline time.Duration
// The time between two scrapes.
scrapeInterval time.Duration
// Whether the target's labels have precedence over the base labels
// assigned by the scraping instance.
honorLabels bool
}
// NewTarget creates a reasonably configured target for querying.
@ -198,11 +201,13 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels clientm
if cfg.BasicAuth != nil {
t.url.User = url.UserPassword(cfg.BasicAuth.Username, cfg.BasicAuth.Password)
}
t.url.RawQuery = cfg.Params.Encode()
t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
t.deadline = time.Duration(cfg.ScrapeTimeout)
t.httpClient = httputil.NewDeadlineClient(time.Duration(cfg.ScrapeTimeout))
t.honorLabels = cfg.HonorLabels
t.metaLabels = metaLabels
t.baseLabels = clientmodel.LabelSet{}
// All remaining internal labels will not be part of the label set.
@ -363,12 +368,29 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
for samples := range t.ingestedSamples {
for _, s := range samples {
s.Metric.MergeFromLabelSet(baseLabels, clientmodel.ExporterLabelPrefix)
if t.honorLabels {
// Merge the metric with the baseLabels for labels not already set in the
// metric. This also considers labels explicitly set to the empty string.
for ln, lv := range baseLabels {
if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv
}
}
} else {
// Merge the ingested metric with the base label set. On a collision the
// value of the label is stored in a label prefixed with the exported prefix.
for ln, lv := range baseLabels {
if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[clientmodel.ExportedLabelPrefix+ln] = v
}
s.Metric[ln] = lv
}
}
// Avoid the copy in Relabel if there are no configs.
if len(t.metricRelabelConfigs) > 0 {
labels, err := Relabel(clientmodel.LabelSet(s.Metric), t.metricRelabelConfigs...)
if err != nil {
log.Errorf("error while relabeling metric %s of instance %s: ", s.Metric, t.url, err)
log.Errorf("Error while relabeling metric %s of instance %s: %s", s.Metric, t.url, err)
continue
}
// Check if the timeseries was dropped.

View File

@ -44,6 +44,91 @@ func TestBaseLabels(t *testing.T) {
}
}
func TestOverwriteLabels(t *testing.T) {
type test struct {
metric string
resultNormal clientmodel.Metric
resultHonor clientmodel.Metric
}
var tests []test
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
for _, test := range tests {
w.Write([]byte(test.metric))
w.Write([]byte(" 1\n"))
}
},
),
)
defer server.Close()
addr := clientmodel.LabelValue(strings.Split(server.URL, "://")[1])
tests = []test{
{
metric: `foo{}`,
resultNormal: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
clientmodel.InstanceLabel: addr,
},
resultHonor: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
clientmodel.InstanceLabel: addr,
},
},
{
metric: `foo{instance=""}`,
resultNormal: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
clientmodel.InstanceLabel: addr,
},
resultHonor: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
},
},
{
metric: `foo{instance="other_instance"}`,
resultNormal: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
clientmodel.InstanceLabel: addr,
clientmodel.ExportedLabelPrefix + clientmodel.InstanceLabel: "other_instance",
},
resultHonor: clientmodel.Metric{
clientmodel.MetricNameLabel: "foo",
clientmodel.InstanceLabel: "other_instance",
},
},
}
target := newTestTarget(server.URL, 10*time.Millisecond, nil)
target.honorLabels = false
app := &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
}
for i, test := range tests {
if !reflect.DeepEqual(app.result[i].Metric, test.resultNormal) {
t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultNormal, app.result[i].Metric)
}
}
target.honorLabels = true
app = &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
}
for i, test := range tests {
if !reflect.DeepEqual(app.result[i].Metric, test.resultHonor) {
t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultHonor, app.result[i].Metric)
}
}
}
func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := newTestTarget("bad schema", 0, nil)

View File

@ -510,6 +510,11 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...clientmodel.Fing
// Append implements Storage.
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
for ln, lv := range sample.Metric {
if len(lv) == 0 {
delete(sample.Metric, ln)
}
}
if s.getNumChunksToPersist() >= s.maxChunksToPersist {
log.Warnf(
"%d chunks waiting for persistence, sample ingestion suspended.",