mirror of https://github.com/prometheus/prometheus
scrape: catch errors when creating HTTP clients (#5182)
* scrape: catch errors when creating HTTP clients This change makes sure that no scrape pool is created with a nil HTTP client. Signed-off-by: Simon Pasquier <spasquie@redhat.com> * Address Tariq's comment Signed-off-by: Simon Pasquier <spasquie@redhat.com> * Address Brian's comment Signed-off-by: Simon Pasquier <spasquie@redhat.com>pull/5215/head
parent
37e35f9e0c
commit
12708acd15
|
@ -14,6 +14,7 @@
|
|||
package scrape
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -104,18 +105,18 @@ func (m *Manager) reload() {
|
|||
m.mtxScrape.Lock()
|
||||
var wg sync.WaitGroup
|
||||
for setName, groups := range m.targetSets {
|
||||
var sp *scrapePool
|
||||
existing, ok := m.scrapePools[setName]
|
||||
if !ok {
|
||||
if _, ok := m.scrapePools[setName]; !ok {
|
||||
scrapeConfig, ok := m.scrapeConfigs[setName]
|
||||
if !ok {
|
||||
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
|
||||
continue
|
||||
}
|
||||
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
|
||||
if err != nil {
|
||||
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||
continue
|
||||
}
|
||||
m.scrapePools[setName] = sp
|
||||
} else {
|
||||
sp = existing
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
@ -123,7 +124,7 @@ func (m *Manager) reload() {
|
|||
go func(sp *scrapePool, groups []*targetgroup.Group) {
|
||||
sp.Sync(groups)
|
||||
wg.Done()
|
||||
}(sp, groups)
|
||||
}(m.scrapePools[setName], groups)
|
||||
|
||||
}
|
||||
m.mtxScrape.Unlock()
|
||||
|
@ -158,16 +159,24 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
|
|||
}
|
||||
m.scrapeConfigs = c
|
||||
|
||||
// Cleanup and reload pool if config has changed.
|
||||
// Cleanup and reload pool if the configuration has changed.
|
||||
var failed bool
|
||||
for name, sp := range m.scrapePools {
|
||||
if cfg, ok := m.scrapeConfigs[name]; !ok {
|
||||
sp.stop()
|
||||
delete(m.scrapePools, name)
|
||||
} else if !reflect.DeepEqual(sp.config, cfg) {
|
||||
sp.reload(cfg)
|
||||
err := sp.reload(cfg)
|
||||
if err != nil {
|
||||
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
|
||||
failed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if failed {
|
||||
return fmt.Errorf("failed to apply the new configuration")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -222,47 +222,115 @@ func TestPopulateLabels(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestScrapeManagerReloadNoChange tests that no scrape reload happens when there is no config change.
|
||||
func TestManagerReloadNoChange(t *testing.T) {
|
||||
tsetName := "test"
|
||||
func loadConfiguration(t *testing.T, c string) *config.Config {
|
||||
t.Helper()
|
||||
|
||||
cfgText := `
|
||||
cfg := &config.Config{}
|
||||
if err := yaml.UnmarshalStrict([]byte(c), cfg); err != nil {
|
||||
t.Fatalf("Unable to load YAML config: %s", err)
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func noopLoop() loop {
|
||||
return &testLoop{
|
||||
startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
|
||||
stopFunc: func() {},
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerApplyConfig(t *testing.T) {
|
||||
// Valid initial configuration.
|
||||
cfgText1 := `
|
||||
scrape_configs:
|
||||
- job_name: '` + tsetName + `'
|
||||
- job_name: job1
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
- targets: ["bar:9090"]
|
||||
`
|
||||
cfg := &config.Config{}
|
||||
if err := yaml.UnmarshalStrict([]byte(cfgText), cfg); err != nil {
|
||||
t.Fatalf("Unable to load YAML config cfgYaml: %s", err)
|
||||
}
|
||||
// Invalid configuration.
|
||||
cfgText2 := `
|
||||
scrape_configs:
|
||||
- job_name: job1
|
||||
scheme: https
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
tls_config:
|
||||
ca_file: /not/existing/ca/file
|
||||
`
|
||||
// Valid configuration.
|
||||
cfgText3 := `
|
||||
scrape_configs:
|
||||
- job_name: job1
|
||||
scheme: https
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
`
|
||||
var (
|
||||
cfg1 = loadConfiguration(t, cfgText1)
|
||||
cfg2 = loadConfiguration(t, cfgText2)
|
||||
cfg3 = loadConfiguration(t, cfgText3)
|
||||
|
||||
ch = make(chan struct{}, 1)
|
||||
)
|
||||
|
||||
scrapeManager := NewManager(nil, nil)
|
||||
// Load the current config.
|
||||
scrapeManager.ApplyConfig(cfg)
|
||||
|
||||
// As reload never happens, new loop should never be called.
|
||||
newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*relabel.Config) loop {
|
||||
t.Fatal("reload happened")
|
||||
return nil
|
||||
ch <- struct{}{}
|
||||
return noopLoop()
|
||||
}
|
||||
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{
|
||||
1: &testLoop{},
|
||||
1: noopLoop(),
|
||||
},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
config: cfg.ScrapeConfigs[0],
|
||||
config: cfg1.ScrapeConfigs[0],
|
||||
}
|
||||
scrapeManager.scrapePools = map[string]*scrapePool{
|
||||
tsetName: sp,
|
||||
"job1": sp,
|
||||
}
|
||||
|
||||
scrapeManager.ApplyConfig(cfg)
|
||||
// Apply the initial configuration.
|
||||
if err := scrapeManager.ApplyConfig(cfg1); err != nil {
|
||||
t.Fatalf("unable to apply configuration: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("reload happened")
|
||||
default:
|
||||
}
|
||||
|
||||
// Apply a configuration for which the reload fails.
|
||||
if err := scrapeManager.ApplyConfig(cfg2); err == nil {
|
||||
t.Fatalf("expecting error but got none")
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("reload happened")
|
||||
default:
|
||||
}
|
||||
|
||||
// Apply a configuration for which the reload succeeds.
|
||||
if err := scrapeManager.ApplyConfig(cfg3); err != nil {
|
||||
t.Fatalf("unable to apply configuration: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatal("reload didn't happen")
|
||||
}
|
||||
|
||||
// Re-applying the same configuration shouldn't trigger a reload.
|
||||
if err := scrapeManager.ApplyConfig(cfg3); err != nil {
|
||||
t.Fatalf("unable to apply configuration: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("reload happened")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerTargetsUpdates(t *testing.T) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -61,6 +62,30 @@ var (
|
|||
},
|
||||
[]string{"interval"},
|
||||
)
|
||||
targetScrapePools = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pools_total",
|
||||
Help: "Total number of scrape pool creation atttempts.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolsFailed = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pools_failed_total",
|
||||
Help: "Total number of scrape pool creations that failed.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolReloads = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pool_reloads_total",
|
||||
Help: "Total number of scrape loop reloads.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolReloadsFailed = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pool_reloads_failed_total",
|
||||
Help: "Total number of failed scrape loop reloads.",
|
||||
},
|
||||
)
|
||||
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_target_sync_length_seconds",
|
||||
|
@ -105,6 +130,10 @@ var (
|
|||
func init() {
|
||||
prometheus.MustRegister(targetIntervalLength)
|
||||
prometheus.MustRegister(targetReloadIntervalLength)
|
||||
prometheus.MustRegister(targetScrapePools)
|
||||
prometheus.MustRegister(targetScrapePoolsFailed)
|
||||
prometheus.MustRegister(targetScrapePoolReloads)
|
||||
prometheus.MustRegister(targetScrapePoolReloadsFailed)
|
||||
prometheus.MustRegister(targetSyncIntervalLength)
|
||||
prometheus.MustRegister(targetScrapePoolSyncsCounter)
|
||||
prometheus.MustRegister(targetScrapeSampleLimit)
|
||||
|
@ -136,15 +165,16 @@ const maxAheadTime = 10 * time.Minute
|
|||
|
||||
type labelsMutator func(labels.Labels) labels.Labels
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) (*scrapePool, error) {
|
||||
targetScrapePools.Inc()
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
||||
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
||||
if err != nil {
|
||||
// Any errors that could occur here should be caught during config validation.
|
||||
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
|
||||
targetScrapePoolsFailed.Inc()
|
||||
return nil, errors.Wrap(err, "error creating HTTP client")
|
||||
}
|
||||
|
||||
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||
|
@ -182,7 +212,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
|||
)
|
||||
}
|
||||
|
||||
return sp
|
||||
return sp, nil
|
||||
}
|
||||
|
||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||
|
@ -227,7 +257,8 @@ func (sp *scrapePool) stop() {
|
|||
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
||||
// but all scrape loops are restarted with the new scrape configuration.
|
||||
// This method returns after all scrape loops that were stopped have stopped scraping.
|
||||
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
||||
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
targetScrapePoolReloads.Inc()
|
||||
start := time.Now()
|
||||
|
||||
sp.mtx.Lock()
|
||||
|
@ -235,8 +266,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
|
||||
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
||||
if err != nil {
|
||||
// Any errors that could occur here should be caught during config validation.
|
||||
level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
|
||||
targetScrapePoolReloadsFailed.Inc()
|
||||
return errors.Wrap(err, "error creating HTTP client")
|
||||
}
|
||||
sp.config = cfg
|
||||
sp.client = client
|
||||
|
@ -272,6 +303,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
||||
time.Since(start).Seconds(),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync converts target groups into actual scrape targets and synchronizes
|
||||
|
|
|
@ -29,16 +29,14 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/relabel"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/relabel"
|
||||
"github.com/prometheus/prometheus/pkg/textparse"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
|
@ -48,9 +46,9 @@ import (
|
|||
|
||||
func TestNewScrapePool(t *testing.T) {
|
||||
var (
|
||||
app = &nopAppendable{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp = newScrapePool(cfg, app, nil)
|
||||
app = &nopAppendable{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp, _ = newScrapePool(cfg, app, nil)
|
||||
)
|
||||
|
||||
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
|
||||
|
@ -85,7 +83,7 @@ func TestDroppedTargetsList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp = newScrapePool(cfg, app, nil)
|
||||
sp, _ = newScrapePool(cfg, app, nil)
|
||||
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __metrics_path__=\"\", __scheme__=\"\", job=\"dropMe\"}"
|
||||
expectedLength = 1
|
||||
)
|
||||
|
@ -307,7 +305,7 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
sp := newScrapePool(cfg, app, nil)
|
||||
sp, _ := newScrapePool(cfg, app, nil)
|
||||
|
||||
loop := sp.newLoop(&Target{}, nil, 0, false, nil)
|
||||
appl, ok := loop.(*scrapeLoop)
|
||||
|
@ -350,7 +348,7 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
newConfig := func() *config.ScrapeConfig {
|
||||
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
|
||||
}
|
||||
sp := newScrapePool(newConfig(), &nopAppendable{}, nil)
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, nil)
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{
|
||||
|
|
Loading…
Reference in New Issue