mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1284 lines
34 KiB
1284 lines
34 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package scrape |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net/http" |
|
"net/http/httptest" |
|
"net/url" |
|
"os" |
|
"sort" |
|
"strconv" |
|
"sync" |
|
"testing" |
|
"time" |
|
|
|
"github.com/go-kit/log" |
|
"github.com/gogo/protobuf/proto" |
|
"github.com/prometheus/client_golang/prometheus" |
|
dto "github.com/prometheus/client_model/go" |
|
"github.com/prometheus/common/model" |
|
"github.com/stretchr/testify/require" |
|
"google.golang.org/protobuf/types/known/timestamppb" |
|
"gopkg.in/yaml.v2" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/discovery" |
|
_ "github.com/prometheus/prometheus/discovery/file" |
|
"github.com/prometheus/prometheus/discovery/targetgroup" |
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/model/relabel" |
|
"github.com/prometheus/prometheus/util/runutil" |
|
"github.com/prometheus/prometheus/util/testutil" |
|
) |
|
|
|
func init() { |
|
// This can be removed when the default validation scheme in common is updated. |
|
model.NameValidationScheme = model.UTF8Validation |
|
} |
|
|
|
func TestPopulateLabels(t *testing.T) { |
|
cases := []struct { |
|
in labels.Labels |
|
cfg *config.ScrapeConfig |
|
noDefaultPort bool |
|
res labels.Labels |
|
resOrig labels.Labels |
|
err string |
|
}{ |
|
// Regular population of scrape config options. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
"custom": "value", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.InstanceLabel: "1.2.3.4:1000", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
"custom": "value", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
"custom": "value", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
}, |
|
// Pre-define/overwrite scrape config labels. |
|
// Leave out port and expect it to be defaulted to scheme. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.SchemeLabel: "http", |
|
model.MetricsPathLabel: "/custom", |
|
model.JobLabel: "custom-job", |
|
model.ScrapeIntervalLabel: "2s", |
|
model.ScrapeTimeoutLabel: "2s", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:80", |
|
model.InstanceLabel: "1.2.3.4:80", |
|
model.SchemeLabel: "http", |
|
model.MetricsPathLabel: "/custom", |
|
model.JobLabel: "custom-job", |
|
model.ScrapeIntervalLabel: "2s", |
|
model.ScrapeTimeoutLabel: "2s", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.SchemeLabel: "http", |
|
model.MetricsPathLabel: "/custom", |
|
model.JobLabel: "custom-job", |
|
model.ScrapeIntervalLabel: "2s", |
|
model.ScrapeTimeoutLabel: "2s", |
|
}), |
|
}, |
|
// Provide instance label. HTTPS port default for IPv6. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "[::1]", |
|
model.InstanceLabel: "custom-instance", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "[::1]:443", |
|
model.InstanceLabel: "custom-instance", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "[::1]", |
|
model.InstanceLabel: "custom-instance", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
}, |
|
// Address label missing. |
|
{ |
|
in: labels.FromStrings("custom", "value"), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "no address", |
|
}, |
|
// Address label missing, but added in relabelling. |
|
{ |
|
in: labels.FromStrings("custom", "host:1234"), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
RelabelConfigs: []*relabel.Config{ |
|
{ |
|
Action: relabel.Replace, |
|
Regex: relabel.MustNewRegexp("(.*)"), |
|
SourceLabels: model.LabelNames{"custom"}, |
|
Replacement: "${1}", |
|
TargetLabel: string(model.AddressLabel), |
|
}, |
|
}, |
|
}, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "host:1234", |
|
model.InstanceLabel: "host:1234", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
"custom": "host:1234", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
"custom": "host:1234", |
|
}), |
|
}, |
|
// Address label missing, but added in relabelling. |
|
{ |
|
in: labels.FromStrings("custom", "host:1234"), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
RelabelConfigs: []*relabel.Config{ |
|
{ |
|
Action: relabel.Replace, |
|
Regex: relabel.MustNewRegexp("(.*)"), |
|
SourceLabels: model.LabelNames{"custom"}, |
|
Replacement: "${1}", |
|
TargetLabel: string(model.AddressLabel), |
|
}, |
|
}, |
|
}, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "host:1234", |
|
model.InstanceLabel: "host:1234", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
"custom": "host:1234", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
"custom": "host:1234", |
|
}), |
|
}, |
|
// Invalid UTF-8 in label. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
"custom": "\xbd", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "invalid label value for \"custom\": \"\\xbd\"", |
|
}, |
|
// Invalid duration in interval label. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.ScrapeIntervalLabel: "2notseconds", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "error parsing scrape interval: unknown unit \"notseconds\" in duration \"2notseconds\"", |
|
}, |
|
// Invalid duration in timeout label. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.ScrapeTimeoutLabel: "2notseconds", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "error parsing scrape timeout: unknown unit \"notseconds\" in duration \"2notseconds\"", |
|
}, |
|
// 0 interval in timeout label. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.ScrapeIntervalLabel: "0s", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "scrape interval cannot be 0", |
|
}, |
|
// 0 duration in timeout label. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.ScrapeTimeoutLabel: "0s", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "scrape timeout cannot be 0", |
|
}, |
|
// Timeout less than interval. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:1000", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "2s", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
res: labels.EmptyLabels(), |
|
resOrig: labels.EmptyLabels(), |
|
err: "scrape timeout cannot be greater than scrape interval (\"2s\" > \"1s\")", |
|
}, |
|
// Don't attach default port. |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
noDefaultPort: true, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.InstanceLabel: "1.2.3.4", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
}, |
|
// Remove default port (http). |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:80", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "http", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
noDefaultPort: true, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.InstanceLabel: "1.2.3.4:80", |
|
model.SchemeLabel: "http", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:80", |
|
model.SchemeLabel: "http", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
}, |
|
// Remove default port (https). |
|
{ |
|
in: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:443", |
|
}), |
|
cfg: &config.ScrapeConfig{ |
|
Scheme: "https", |
|
MetricsPath: "/metrics", |
|
JobName: "job", |
|
ScrapeInterval: model.Duration(time.Second), |
|
ScrapeTimeout: model.Duration(time.Second), |
|
}, |
|
noDefaultPort: true, |
|
res: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4", |
|
model.InstanceLabel: "1.2.3.4:443", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
resOrig: labels.FromMap(map[string]string{ |
|
model.AddressLabel: "1.2.3.4:443", |
|
model.SchemeLabel: "https", |
|
model.MetricsPathLabel: "/metrics", |
|
model.JobLabel: "job", |
|
model.ScrapeIntervalLabel: "1s", |
|
model.ScrapeTimeoutLabel: "1s", |
|
}), |
|
}, |
|
} |
|
for _, c := range cases { |
|
in := c.in.Copy() |
|
|
|
res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg, c.noDefaultPort) |
|
if c.err != "" { |
|
require.EqualError(t, err, c.err) |
|
} else { |
|
require.NoError(t, err) |
|
} |
|
require.Equal(t, c.in, in) |
|
testutil.RequireEqual(t, c.res, res) |
|
testutil.RequireEqual(t, c.resOrig, orig) |
|
} |
|
} |
|
|
|
func loadConfiguration(t testing.TB, c string) *config.Config { |
|
t.Helper() |
|
|
|
cfg := &config.Config{} |
|
err := yaml.UnmarshalStrict([]byte(c), cfg) |
|
require.NoError(t, err, "Unable to load YAML config.") |
|
|
|
return cfg |
|
} |
|
|
|
func noopLoop() loop { |
|
return &testLoop{ |
|
startFunc: func(interval, timeout time.Duration, errc chan<- error) {}, |
|
stopFunc: func() {}, |
|
} |
|
} |
|
|
|
func TestManagerApplyConfig(t *testing.T) { |
|
// Valid initial configuration. |
|
cfgText1 := ` |
|
scrape_configs: |
|
- job_name: job1 |
|
static_configs: |
|
- targets: ["foo:9090"] |
|
` |
|
// Invalid configuration. |
|
cfgText2 := ` |
|
scrape_configs: |
|
- job_name: job1 |
|
scheme: https |
|
static_configs: |
|
- targets: ["foo:9090"] |
|
tls_config: |
|
ca_file: /not/existing/ca/file |
|
` |
|
// Valid configuration. |
|
cfgText3 := ` |
|
scrape_configs: |
|
- job_name: job1 |
|
scheme: https |
|
static_configs: |
|
- targets: ["foo:9090"] |
|
` |
|
var ( |
|
cfg1 = loadConfiguration(t, cfgText1) |
|
cfg2 = loadConfiguration(t, cfgText2) |
|
cfg3 = loadConfiguration(t, cfgText3) |
|
|
|
ch = make(chan struct{}, 1) |
|
|
|
testRegistry = prometheus.NewRegistry() |
|
) |
|
|
|
opts := Options{} |
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) |
|
require.NoError(t, err) |
|
newLoop := func(scrapeLoopOptions) loop { |
|
ch <- struct{}{} |
|
return noopLoop() |
|
} |
|
sp := &scrapePool{ |
|
appendable: &nopAppendable{}, |
|
activeTargets: map[uint64]*Target{ |
|
1: {}, |
|
}, |
|
loops: map[uint64]loop{ |
|
1: noopLoop(), |
|
}, |
|
newLoop: newLoop, |
|
logger: nil, |
|
config: cfg1.ScrapeConfigs[0], |
|
client: http.DefaultClient, |
|
metrics: scrapeManager.metrics, |
|
symbolTable: labels.NewSymbolTable(), |
|
} |
|
scrapeManager.scrapePools = map[string]*scrapePool{ |
|
"job1": sp, |
|
} |
|
|
|
// Apply the initial configuration. |
|
err = scrapeManager.ApplyConfig(cfg1) |
|
require.NoError(t, err, "Unable to apply configuration.") |
|
select { |
|
case <-ch: |
|
require.FailNow(t, "Reload happened.") |
|
default: |
|
} |
|
|
|
// Apply a configuration for which the reload fails. |
|
err = scrapeManager.ApplyConfig(cfg2) |
|
require.Error(t, err, "Expecting error but got none.") |
|
select { |
|
case <-ch: |
|
require.FailNow(t, "Reload happened.") |
|
default: |
|
} |
|
|
|
// Apply a configuration for which the reload succeeds. |
|
err = scrapeManager.ApplyConfig(cfg3) |
|
require.NoError(t, err, "Unable to apply configuration.") |
|
select { |
|
case <-ch: |
|
default: |
|
require.FailNow(t, "Reload didn't happen.") |
|
} |
|
|
|
// Re-applying the same configuration shouldn't trigger a reload. |
|
err = scrapeManager.ApplyConfig(cfg3) |
|
require.NoError(t, err, "Unable to apply configuration.") |
|
select { |
|
case <-ch: |
|
require.FailNow(t, "Reload happened.") |
|
default: |
|
} |
|
} |
|
|
|
func TestManagerTargetsUpdates(t *testing.T) { |
|
opts := Options{} |
|
testRegistry := prometheus.NewRegistry() |
|
m, err := NewManager(&opts, nil, nil, nil, testRegistry) |
|
require.NoError(t, err) |
|
|
|
ts := make(chan map[string][]*targetgroup.Group) |
|
go m.Run(ts) |
|
defer m.Stop() |
|
|
|
tgSent := make(map[string][]*targetgroup.Group) |
|
for x := 0; x < 10; x++ { |
|
tgSent[strconv.Itoa(x)] = []*targetgroup.Group{ |
|
{ |
|
Source: strconv.Itoa(x), |
|
}, |
|
} |
|
|
|
select { |
|
case ts <- tgSent: |
|
case <-time.After(10 * time.Millisecond): |
|
require.Fail(t, "Scrape manager's channel remained blocked after the set threshold.") |
|
} |
|
} |
|
|
|
m.mtxScrape.Lock() |
|
tsetActual := m.targetSets |
|
m.mtxScrape.Unlock() |
|
|
|
// Make sure all updates have been received. |
|
require.Equal(t, tgSent, tsetActual) |
|
|
|
select { |
|
case <-m.triggerReload: |
|
default: |
|
require.Fail(t, "No scrape loops reload was triggered after targets update.") |
|
} |
|
} |
|
|
|
func TestSetOffsetSeed(t *testing.T) { |
|
getConfig := func(prometheus string) *config.Config { |
|
cfgText := ` |
|
global: |
|
external_labels: |
|
prometheus: '` + prometheus + `' |
|
` |
|
|
|
cfg := &config.Config{} |
|
err := yaml.UnmarshalStrict([]byte(cfgText), cfg) |
|
require.NoError(t, err, "Unable to load YAML config cfgYaml.") |
|
|
|
return cfg |
|
} |
|
|
|
opts := Options{} |
|
testRegistry := prometheus.NewRegistry() |
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) |
|
require.NoError(t, err) |
|
|
|
// Load the first config. |
|
cfg1 := getConfig("ha1") |
|
err = scrapeManager.setOffsetSeed(cfg1.GlobalConfig.ExternalLabels) |
|
require.NoError(t, err) |
|
offsetSeed1 := scrapeManager.offsetSeed |
|
|
|
require.NotZero(t, offsetSeed1, "Offset seed has to be a hash of uint64.") |
|
|
|
// Load the first config. |
|
cfg2 := getConfig("ha2") |
|
require.NoError(t, scrapeManager.setOffsetSeed(cfg2.GlobalConfig.ExternalLabels)) |
|
offsetSeed2 := scrapeManager.offsetSeed |
|
|
|
require.NotEqual(t, offsetSeed1, offsetSeed2, "Offset seed should not be the same on different set of external labels.") |
|
} |
|
|
|
func TestManagerScrapePools(t *testing.T) { |
|
cfgText1 := ` |
|
scrape_configs: |
|
- job_name: job1 |
|
static_configs: |
|
- targets: ["foo:9090"] |
|
- job_name: job2 |
|
static_configs: |
|
- targets: ["foo:9091", "foo:9092"] |
|
` |
|
cfgText2 := ` |
|
scrape_configs: |
|
- job_name: job1 |
|
static_configs: |
|
- targets: ["foo:9090", "foo:9094"] |
|
- job_name: job3 |
|
static_configs: |
|
- targets: ["foo:9093"] |
|
` |
|
var ( |
|
cfg1 = loadConfiguration(t, cfgText1) |
|
cfg2 = loadConfiguration(t, cfgText2) |
|
testRegistry = prometheus.NewRegistry() |
|
) |
|
|
|
reload := func(scrapeManager *Manager, cfg *config.Config) { |
|
newLoop := func(scrapeLoopOptions) loop { |
|
return noopLoop() |
|
} |
|
scrapeManager.scrapePools = map[string]*scrapePool{} |
|
for _, sc := range cfg.ScrapeConfigs { |
|
_, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
sp := &scrapePool{ |
|
appendable: &nopAppendable{}, |
|
activeTargets: map[uint64]*Target{}, |
|
loops: map[uint64]loop{ |
|
1: noopLoop(), |
|
}, |
|
newLoop: newLoop, |
|
logger: nil, |
|
config: sc, |
|
client: http.DefaultClient, |
|
cancel: cancel, |
|
} |
|
for _, c := range sc.ServiceDiscoveryConfigs { |
|
staticConfig := c.(discovery.StaticConfig) |
|
for _, group := range staticConfig { |
|
for i := range group.Targets { |
|
sp.activeTargets[uint64(i)] = &Target{} |
|
} |
|
} |
|
} |
|
scrapeManager.scrapePools[sc.JobName] = sp |
|
} |
|
} |
|
|
|
opts := Options{} |
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) |
|
require.NoError(t, err) |
|
|
|
reload(scrapeManager, cfg1) |
|
require.ElementsMatch(t, []string{"job1", "job2"}, scrapeManager.ScrapePools()) |
|
|
|
reload(scrapeManager, cfg2) |
|
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) |
|
} |
|
|
|
// TestManagerCTZeroIngestion tests scrape manager for CT cases. |
|
func TestManagerCTZeroIngestion(t *testing.T) { |
|
const mName = "expected_counter" |
|
|
|
for _, tc := range []struct { |
|
name string |
|
counterSample *dto.Counter |
|
enableCTZeroIngestion bool |
|
}{ |
|
{ |
|
name: "disabled with CT on counter", |
|
counterSample: &dto.Counter{ |
|
Value: proto.Float64(1.0), |
|
// Timestamp does not matter as long as it exists in this test. |
|
CreatedTimestamp: timestamppb.Now(), |
|
}, |
|
}, |
|
{ |
|
name: "enabled with CT on counter", |
|
counterSample: &dto.Counter{ |
|
Value: proto.Float64(1.0), |
|
// Timestamp does not matter as long as it exists in this test. |
|
CreatedTimestamp: timestamppb.Now(), |
|
}, |
|
enableCTZeroIngestion: true, |
|
}, |
|
{ |
|
name: "enabled without CT on counter", |
|
counterSample: &dto.Counter{ |
|
Value: proto.Float64(1.0), |
|
}, |
|
enableCTZeroIngestion: true, |
|
}, |
|
} { |
|
t.Run(tc.name, func(t *testing.T) { |
|
app := &collectResultAppender{} |
|
scrapeManager, err := NewManager( |
|
&Options{ |
|
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, |
|
skipOffsetting: true, |
|
}, |
|
log.NewLogfmtLogger(os.Stderr), |
|
nil, |
|
&collectResultAppendable{app}, |
|
prometheus.NewRegistry(), |
|
) |
|
require.NoError(t, err) |
|
|
|
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ |
|
GlobalConfig: config.GlobalConfig{ |
|
// Disable regular scrapes. |
|
ScrapeInterval: model.Duration(9999 * time.Minute), |
|
ScrapeTimeout: model.Duration(5 * time.Second), |
|
// Ensure the proto is chosen. We need proto as it's the only protocol |
|
// with the CT parsing support. |
|
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, |
|
}, |
|
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, |
|
})) |
|
|
|
once := sync.Once{} |
|
// Start fake HTTP target to that allow one scrape only. |
|
server := httptest.NewServer( |
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|
fail := true |
|
once.Do(func() { |
|
fail = false |
|
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) |
|
|
|
ctrType := dto.MetricType_COUNTER |
|
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ |
|
Name: proto.String(mName), |
|
Type: &ctrType, |
|
Metric: []*dto.Metric{{Counter: tc.counterSample}}, |
|
})) |
|
}) |
|
|
|
if fail { |
|
w.WriteHeader(http.StatusInternalServerError) |
|
} |
|
}), |
|
) |
|
defer server.Close() |
|
|
|
serverURL, err := url.Parse(server.URL) |
|
require.NoError(t, err) |
|
|
|
// Add fake target directly into tsets + reload. Normally users would use |
|
// Manager.Run and wait for minimum 5s refresh interval. |
|
scrapeManager.updateTsets(map[string][]*targetgroup.Group{ |
|
"test": {{ |
|
Targets: []model.LabelSet{{ |
|
model.SchemeLabel: model.LabelValue(serverURL.Scheme), |
|
model.AddressLabel: model.LabelValue(serverURL.Host), |
|
}}, |
|
}}, |
|
}) |
|
scrapeManager.reload() |
|
|
|
var got []float64 |
|
// Wait for one scrape. |
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) |
|
defer cancel() |
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { |
|
app.mtx.Lock() |
|
defer app.mtx.Unlock() |
|
|
|
// Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug |
|
// and it's not worth waiting. |
|
for _, f := range app.resultFloats { |
|
if f.metric.Get(model.MetricNameLabel) == mName { |
|
got = append(got, f.f) |
|
} |
|
} |
|
if len(app.resultFloats) > 0 { |
|
return nil |
|
} |
|
return fmt.Errorf("expected some samples, got none") |
|
}), "after 1 minute") |
|
scrapeManager.Stop() |
|
|
|
// Check for zero samples, assuming we only injected always one sample. |
|
// Did it contain CT to inject? If yes, was CT zero enabled? |
|
if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion { |
|
require.Len(t, got, 2) |
|
require.Equal(t, 0.0, got[0]) |
|
require.Equal(t, tc.counterSample.GetValue(), got[1]) |
|
return |
|
} |
|
|
|
// Expect only one, valid sample. |
|
require.Len(t, got, 1) |
|
require.Equal(t, tc.counterSample.GetValue(), got[0]) |
|
}) |
|
} |
|
} |
|
|
|
func TestUnregisterMetrics(t *testing.T) { |
|
reg := prometheus.NewRegistry() |
|
// Check that all metrics can be unregistered, allowing a second manager to be created. |
|
for i := 0; i < 2; i++ { |
|
opts := Options{} |
|
manager, err := NewManager(&opts, nil, nil, nil, reg) |
|
require.NotNil(t, manager) |
|
require.NoError(t, err) |
|
// Unregister all metrics. |
|
manager.UnregisterMetrics() |
|
} |
|
} |
|
|
|
func applyConfig( |
|
t *testing.T, |
|
config string, |
|
scrapeManager *Manager, |
|
discoveryManager *discovery.Manager, |
|
) { |
|
t.Helper() |
|
|
|
cfg := loadConfiguration(t, config) |
|
require.NoError(t, scrapeManager.ApplyConfig(cfg)) |
|
|
|
c := make(map[string]discovery.Configs) |
|
scfgs, err := cfg.GetScrapeConfigs() |
|
require.NoError(t, err) |
|
for _, v := range scfgs { |
|
c[v.JobName] = v.ServiceDiscoveryConfigs |
|
} |
|
require.NoError(t, discoveryManager.ApplyConfig(c)) |
|
} |
|
|
|
func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) { |
|
t.Helper() |
|
|
|
reg := prometheus.NewRegistry() |
|
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) |
|
require.NoError(t, err) |
|
discoveryManager := discovery.NewManager( |
|
ctx, |
|
log.NewNopLogger(), |
|
reg, |
|
sdMetrics, |
|
discovery.Updatert(100*time.Millisecond), |
|
) |
|
scrapeManager, err := NewManager( |
|
&Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)}, |
|
nil, |
|
nil, |
|
nopAppendable{}, |
|
prometheus.NewRegistry(), |
|
) |
|
require.NoError(t, err) |
|
go discoveryManager.Run() |
|
go scrapeManager.Run(discoveryManager.SyncCh()) |
|
return discoveryManager, scrapeManager |
|
} |
|
|
|
func writeIntoFile(t *testing.T, content, filePattern string) *os.File { |
|
t.Helper() |
|
|
|
file, err := os.CreateTemp("", filePattern) |
|
require.NoError(t, err) |
|
_, err = file.WriteString(content) |
|
require.NoError(t, err) |
|
return file |
|
} |
|
|
|
func requireTargets( |
|
t *testing.T, |
|
scrapeManager *Manager, |
|
jobName string, |
|
waitToAppear bool, |
|
expectedTargets []string, |
|
) { |
|
t.Helper() |
|
|
|
require.Eventually(t, func() bool { |
|
targets, ok := scrapeManager.TargetsActive()[jobName] |
|
if !ok { |
|
if waitToAppear { |
|
return false |
|
} |
|
t.Fatalf("job %s shouldn't be dropped", jobName) |
|
} |
|
if expectedTargets == nil { |
|
return targets == nil |
|
} |
|
if len(targets) != len(expectedTargets) { |
|
return false |
|
} |
|
sTargets := []string{} |
|
for _, t := range targets { |
|
sTargets = append(sTargets, t.String()) |
|
} |
|
sort.Strings(expectedTargets) |
|
sort.Strings(sTargets) |
|
for i, t := range sTargets { |
|
if t != expectedTargets[i] { |
|
return false |
|
} |
|
} |
|
return true |
|
}, 1*time.Second, 100*time.Millisecond) |
|
} |
|
|
|
// TestTargetDisappearsAfterProviderRemoved makes sure that when a provider is dropped, (only) its targets are dropped. |
|
func TestTargetDisappearsAfterProviderRemoved(t *testing.T) { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
myJob := "my-job" |
|
myJobSDTargetURL := "my:9876" |
|
myJobStaticTargetURL := "my:5432" |
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL) |
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json") |
|
|
|
baseConfig := ` |
|
scrape_configs: |
|
- job_name: %s |
|
static_configs: |
|
- targets: ['%s'] |
|
file_sd_configs: |
|
- files: ['%s'] |
|
` |
|
|
|
discoveryManager, scrapeManager := runManagers(t, ctx) |
|
defer scrapeManager.Stop() |
|
|
|
applyConfig( |
|
t, |
|
fmt.Sprintf( |
|
baseConfig, |
|
myJob, |
|
myJobStaticTargetURL, |
|
sDFile.Name(), |
|
), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
// Make sure the jobs targets are taken into account |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
myJob, |
|
true, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL), |
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), |
|
}, |
|
) |
|
|
|
// Apply a new config where a provider is removed |
|
baseConfig = ` |
|
scrape_configs: |
|
- job_name: %s |
|
static_configs: |
|
- targets: ['%s'] |
|
` |
|
applyConfig( |
|
t, |
|
fmt.Sprintf( |
|
baseConfig, |
|
myJob, |
|
myJobStaticTargetURL, |
|
), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
// Make sure the corresponding target was dropped |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
myJob, |
|
false, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), |
|
}, |
|
) |
|
|
|
// Apply a new config with no providers |
|
baseConfig = ` |
|
scrape_configs: |
|
- job_name: %s |
|
` |
|
applyConfig( |
|
t, |
|
fmt.Sprintf( |
|
baseConfig, |
|
myJob, |
|
), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
// Make sure the corresponding target was dropped |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
myJob, |
|
false, |
|
nil, |
|
) |
|
} |
|
|
|
// TestOnlyProviderStaleTargetsAreDropped makes sure that when a job has only one provider with multiple targets |
|
// and when the provider can no longer discover some of those targets, only those stale targets are dropped. |
|
func TestOnlyProviderStaleTargetsAreDropped(t *testing.T) { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
jobName := "my-job" |
|
jobTarget1URL := "foo:9876" |
|
jobTarget2URL := "foo:5432" |
|
|
|
sdFile1Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget1URL) |
|
sdFile2Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget2URL) |
|
sDFile1 := writeIntoFile(t, sdFile1Content, "*targets.json") |
|
sDFile2 := writeIntoFile(t, sdFile2Content, "*targets.json") |
|
|
|
baseConfig := ` |
|
scrape_configs: |
|
- job_name: %s |
|
file_sd_configs: |
|
- files: ['%s', '%s'] |
|
` |
|
discoveryManager, scrapeManager := runManagers(t, ctx) |
|
defer scrapeManager.Stop() |
|
|
|
applyConfig( |
|
t, |
|
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), sDFile2.Name()), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
|
|
// Make sure the job's targets are taken into account |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
jobName, |
|
true, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", jobTarget1URL), |
|
fmt.Sprintf("http://%s/metrics", jobTarget2URL), |
|
}, |
|
) |
|
|
|
// Apply the same config for the same job but with a non existing file to make the provider |
|
// unable to discover some targets |
|
applyConfig( |
|
t, |
|
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), "/idontexistdoi.json"), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
|
|
// The old target should get dropped |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
jobName, |
|
false, |
|
[]string{fmt.Sprintf("http://%s/metrics", jobTarget1URL)}, |
|
) |
|
} |
|
|
|
// TestProviderStaleTargetsAreDropped makes sure that when a job has only one provider and when that provider |
|
// should no longer discover targets, the targets of that provider are dropped. |
|
// See: https://github.com/prometheus/prometheus/issues/12858 |
|
func TestProviderStaleTargetsAreDropped(t *testing.T) { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
jobName := "my-job" |
|
jobTargetURL := "foo:9876" |
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTargetURL) |
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json") |
|
|
|
baseConfig := ` |
|
scrape_configs: |
|
- job_name: %s |
|
file_sd_configs: |
|
- files: ['%s'] |
|
` |
|
discoveryManager, scrapeManager := runManagers(t, ctx) |
|
defer scrapeManager.Stop() |
|
|
|
applyConfig( |
|
t, |
|
fmt.Sprintf(baseConfig, jobName, sDFile.Name()), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
|
|
// Make sure the job's targets are taken into account |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
jobName, |
|
true, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", jobTargetURL), |
|
}, |
|
) |
|
|
|
// Apply the same config for the same job but with a non existing file to make the provider |
|
// unable to discover some targets |
|
applyConfig( |
|
t, |
|
fmt.Sprintf(baseConfig, jobName, "/idontexistdoi.json"), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
|
|
// The old target should get dropped |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
jobName, |
|
false, |
|
nil, |
|
) |
|
} |
|
|
|
// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when one of them should no |
|
// longer discover targets, only the stale targets of that provider are dropped. |
|
func TestOnlyStaleTargetsAreDropped(t *testing.T) { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
myJob := "my-job" |
|
myJobSDTargetURL := "my:9876" |
|
myJobStaticTargetURL := "my:5432" |
|
otherJob := "other-job" |
|
otherJobTargetURL := "other:1234" |
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL) |
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json") |
|
|
|
baseConfig := ` |
|
scrape_configs: |
|
- job_name: %s |
|
static_configs: |
|
- targets: ['%s'] |
|
file_sd_configs: |
|
- files: ['%s'] |
|
- job_name: %s |
|
static_configs: |
|
- targets: ['%s'] |
|
` |
|
|
|
discoveryManager, scrapeManager := runManagers(t, ctx) |
|
defer scrapeManager.Stop() |
|
|
|
// Apply the initial config with an existing file |
|
applyConfig( |
|
t, |
|
fmt.Sprintf( |
|
baseConfig, |
|
myJob, |
|
myJobStaticTargetURL, |
|
sDFile.Name(), |
|
otherJob, |
|
otherJobTargetURL, |
|
), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
// Make sure the jobs targets are taken into account |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
myJob, |
|
true, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL), |
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), |
|
}, |
|
) |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
otherJob, |
|
true, |
|
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)}, |
|
) |
|
|
|
// Apply the same config with a non existing file for myJob |
|
applyConfig( |
|
t, |
|
fmt.Sprintf( |
|
baseConfig, |
|
myJob, |
|
myJobStaticTargetURL, |
|
"/idontexistdoi.json", |
|
otherJob, |
|
otherJobTargetURL, |
|
), |
|
scrapeManager, |
|
discoveryManager, |
|
) |
|
|
|
// Only the SD target should get dropped for myJob |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
myJob, |
|
false, |
|
[]string{ |
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL), |
|
}, |
|
) |
|
// The otherJob should keep its target |
|
requireTargets( |
|
t, |
|
scrapeManager, |
|
otherJob, |
|
false, |
|
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)}, |
|
) |
|
}
|
|
|