Merge pull request #645 from fabxc/fabxc/sd

Make target manager source based.
pull/662/head
Julius Volz 2015-04-27 18:30:33 +02:00
commit dad766e794
19 changed files with 938 additions and 752 deletions

View File

@ -26,14 +26,25 @@ const (
// timeseries. // timeseries.
MetricNameLabel LabelName = "__name__" MetricNameLabel LabelName = "__name__"
// AddressLabel is the name of the label that holds the address of
// a scrape target.
AddressLabel LabelName = "__address__"
// ReservedLabelPrefix is a prefix which is not legal in user-supplied // ReservedLabelPrefix is a prefix which is not legal in user-supplied
// label names. // label names.
ReservedLabelPrefix = "__" ReservedLabelPrefix = "__"
// MetaLabelPrefix is a prefix for labels that provide meta information.
// Labels with the prefix will not be attached to time series.
MetaLabelPrefix = "__meta_"
// JobLabel is the label name indicating the job from which a timeseries // JobLabel is the label name indicating the job from which a timeseries
// was scraped. // was scraped.
JobLabel LabelName = "job" JobLabel LabelName = "job"
// InstanceLabel is the label name used for the instance label.
InstanceLabel LabelName = "instance"
// BucketLabel is used for the label that defines the upper bound of a // BucketLabel is used for the label that defines the upper bound of a
// bucket of a histogram ("le" -> "less or equal"). // bucket of a histogram ("le" -> "less or equal").
BucketLabel = "le" BucketLabel = "le"

View File

@ -16,6 +16,7 @@ package config
import ( import (
"fmt" "fmt"
"regexp" "regexp"
"strings"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -55,6 +56,21 @@ func (c Config) validateLabels(labels *pb.LabelPairs) error {
return nil return nil
} }
// validateHosts validates whether a target group contains valid hosts.
func (c Config) validateHosts(hosts []string) error {
if hosts == nil {
return nil
}
for _, host := range hosts {
// Make sure that this does not contain any paths or schemes.
// This ensures that old configurations error.
if strings.Contains(host, "/") {
return fmt.Errorf("invalid host '%s', no schemes or paths allowed", host)
}
}
return nil
}
// Validate checks an entire parsed Config for the validity of its fields. // Validate checks an entire parsed Config for the validity of its fields.
func (c Config) Validate() error { func (c Config) Validate() error {
// Check the global configuration section for validity. // Check the global configuration section for validity.
@ -93,6 +109,9 @@ func (c Config) Validate() error {
if err := c.validateLabels(targetGroup.Labels); err != nil { if err := c.validateLabels(targetGroup.Labels); err != nil {
return fmt.Errorf("invalid labels for job '%s': %s", job.GetName(), err) return fmt.Errorf("invalid labels for job '%s': %s", job.GetName(), err)
} }
if err := c.validateHosts(targetGroup.Target); err != nil {
return fmt.Errorf("invalid targets for job '%s': %s", job.GetName(), err)
}
} }
if job.SdName != nil && len(job.TargetGroup) > 0 { if job.SdName != nil && len(job.TargetGroup) > 0 {
return fmt.Errorf("specified both DNS-SD name and target group for job: %s", job.GetName()) return fmt.Errorf("specified both DNS-SD name and target group for job: %s", job.GetName())
@ -115,7 +134,7 @@ func (c Config) GetJobByName(name string) *JobConfig {
// GlobalLabels returns the global labels as a LabelSet. // GlobalLabels returns the global labels as a LabelSet.
func (c Config) GlobalLabels() clientmodel.LabelSet { func (c Config) GlobalLabels() clientmodel.LabelSet {
labels := clientmodel.LabelSet{} labels := clientmodel.LabelSet{}
if c.Global.Labels != nil { if c.Global != nil && c.Global.Labels != nil {
for _, label := range c.Global.Labels.Label { for _, label := range c.Global.Labels.Label {
labels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) labels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())
} }
@ -156,6 +175,11 @@ type JobConfig struct {
pb.JobConfig pb.JobConfig
} }
// SDRefreshInterval gets the the SD refresh interval for a job.
func (c JobConfig) SDRefreshInterval() time.Duration {
return stringToDuration(c.GetSdRefreshInterval())
}
// ScrapeInterval gets the scrape interval for a job. // ScrapeInterval gets the scrape interval for a job.
func (c JobConfig) ScrapeInterval() time.Duration { func (c JobConfig) ScrapeInterval() time.Duration {
return stringToDuration(c.GetScrapeInterval()) return stringToDuration(c.GetScrapeInterval())
@ -165,3 +189,19 @@ func (c JobConfig) ScrapeInterval() time.Duration {
func (c JobConfig) ScrapeTimeout() time.Duration { func (c JobConfig) ScrapeTimeout() time.Duration {
return stringToDuration(c.GetScrapeTimeout()) return stringToDuration(c.GetScrapeTimeout())
} }
// TargetGroup is derived from a protobuf TargetGroup and attaches a source to it
// that identifies the origin of the group.
type TargetGroup struct {
// Source is an identifier that describes a group of targets.
Source string
// Labels is a set of labels that is common across all targets in the group.
Labels clientmodel.LabelSet
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Targets []clientmodel.LabelSet
}
func (tg *TargetGroup) String() string {
return tg.Source
}

View File

@ -71,8 +71,10 @@ message JobConfig {
// List of labeled target groups for this job. Only legal when DNS-SD isn't // List of labeled target groups for this job. Only legal when DNS-SD isn't
// used for a job. // used for a job.
repeated TargetGroup target_group = 5; repeated TargetGroup target_group = 5;
// The HTTP resource path to fetch metrics from on targets. // The HTTP resource path on which to fetch metrics from targets.
optional string metrics_path = 6 [default = "/metrics"]; optional string metrics_path = 6 [default = "/metrics"];
// The URL scheme with which to fetch metrics from targets.
optional string scheme = 8 [default = "http"];
} }
// The top-level Prometheus configuration. // The top-level Prometheus configuration.

View File

@ -13,8 +13,10 @@ global <
job: < job: <
name: "prometheus" name: "prometheus"
scrape_interval: "15s" scrape_interval: "15s"
metrics_path: "/metrics"
scheme: "http"
target_group: < target_group: <
target: "http://localhost:9090/metrics.json" target: "localhost:9090"
> >
> >

View File

@ -2,6 +2,6 @@ job: <
name: "testjob" name: "testjob"
sd_name: "sd_name" sd_name: "sd_name"
target_group: < target_group: <
target: "http://sampletarget:8080/metrics.json" target: "sampletarget:8080"
> >
> >

View File

@ -15,7 +15,7 @@ job: <
scrape_interval: "15s" scrape_interval: "15s"
target_group: < target_group: <
target: "http://localhost:9090/metrics.json" target: "localhost:9090"
labels: < labels: <
label: < label: <
name: "group" name: "group"
@ -30,11 +30,12 @@ job: <
scrape_interval: "30s" scrape_interval: "30s"
target_group: < target_group: <
target: "http://random.com:8080/metrics.json" target: "random.com:8080"
target: "http://random.com:8081/metrics.json" target: "random.com:8081"
target: "http://random.com:8082/metrics.json" target: "random.com:8082"
target: "http://random.com:8083/metrics.json" target: "random.com:8083"
target: "http://random.com:8084/metrics.json" target: "random.com:8084"
labels: < labels: <
label: < label: <
name: "group" name: "group"
@ -43,8 +44,9 @@ job: <
> >
> >
target_group: < target_group: <
target: "http://random.com:8085/metrics.json" target: "random.com:8085"
target: "http://random.com:8086/metrics.json" target: "random.com:8086"
labels: < labels: <
label: < label: <
name: "group" name: "group"

View File

@ -169,8 +169,10 @@ type JobConfig struct {
// List of labeled target groups for this job. Only legal when DNS-SD isn't // List of labeled target groups for this job. Only legal when DNS-SD isn't
// used for a job. // used for a job.
TargetGroup []*TargetGroup `protobuf:"bytes,5,rep,name=target_group" json:"target_group,omitempty"` TargetGroup []*TargetGroup `protobuf:"bytes,5,rep,name=target_group" json:"target_group,omitempty"`
// The HTTP resource path to fetch metrics from on targets. // The HTTP resource path on which to fetch metrics from targets.
MetricsPath *string `protobuf:"bytes,6,opt,name=metrics_path,def=/metrics" json:"metrics_path,omitempty"` MetricsPath *string `protobuf:"bytes,6,opt,name=metrics_path,def=/metrics" json:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
Scheme *string `protobuf:"bytes,8,opt,name=scheme,def=http" json:"scheme,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
@ -181,6 +183,7 @@ func (*JobConfig) ProtoMessage() {}
const Default_JobConfig_ScrapeTimeout string = "10s" const Default_JobConfig_ScrapeTimeout string = "10s"
const Default_JobConfig_SdRefreshInterval string = "30s" const Default_JobConfig_SdRefreshInterval string = "30s"
const Default_JobConfig_MetricsPath string = "/metrics" const Default_JobConfig_MetricsPath string = "/metrics"
const Default_JobConfig_Scheme string = "http"
func (m *JobConfig) GetName() string { func (m *JobConfig) GetName() string {
if m != nil && m.Name != nil { if m != nil && m.Name != nil {
@ -231,6 +234,13 @@ func (m *JobConfig) GetMetricsPath() string {
return Default_JobConfig_MetricsPath return Default_JobConfig_MetricsPath
} }
func (m *JobConfig) GetScheme() string {
if m != nil && m.Scheme != nil {
return *m.Scheme
}
return Default_JobConfig_Scheme
}
// The top-level Prometheus configuration. // The top-level Prometheus configuration.
type PrometheusConfig struct { type PrometheusConfig struct {
// Global Prometheus configuration options. If omitted, an empty global // Global Prometheus configuration options. If omitted, an empty global

12
main.go
View File

@ -77,7 +77,7 @@ var (
type prometheus struct { type prometheus struct {
ruleManager manager.RuleManager ruleManager manager.RuleManager
targetManager retrieval.TargetManager targetManager *retrieval.TargetManager
notificationHandler *notification.NotificationHandler notificationHandler *notification.NotificationHandler
storage local.Storage storage local.Storage
remoteStorageQueues []*remote.StorageQueueManager remoteStorageQueues []*remote.StorageQueueManager
@ -152,8 +152,11 @@ func NewPrometheus() *prometheus {
sampleAppender = fanout sampleAppender = fanout
} }
targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) targetManager, err := retrieval.NewTargetManager(conf, sampleAppender)
targetManager.AddTargetsFromConfig(conf) if err != nil {
glog.Errorf("Error creating target manager: %s", err)
os.Exit(1)
}
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
SampleAppender: sampleAppender, SampleAppender: sampleAppender,
@ -176,7 +179,7 @@ func NewPrometheus() *prometheus {
BuildInfo: BuildInfo, BuildInfo: BuildInfo,
Config: conf.String(), Config: conf.String(),
RuleManager: ruleManager, RuleManager: ruleManager,
TargetPools: targetManager.Pools(), TargetPools: targetManager.Pools,
Flags: flags, Flags: flags,
Birth: time.Now(), Birth: time.Now(),
PathPrefix: *pathPrefix, PathPrefix: *pathPrefix,
@ -231,6 +234,7 @@ func (p *prometheus) Serve() {
} }
go p.ruleManager.Run() go p.ruleManager.Run()
go p.notificationHandler.Run() go p.notificationHandler.Run()
go p.targetManager.Run()
p.storage.Start() p.storage.Start()

View File

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors // Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
@ -11,13 +11,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package retrieval package discovery
import ( import (
"fmt" "fmt"
"net" "net"
"net/url"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -25,12 +25,18 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/utility"
) )
const resolvConf = "/etc/resolv.conf" const (
resolvConf = "/etc/resolv.conf"
dnsSourcePrefix = "dns"
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var ( var (
dnsSDLookupsCount = prometheus.NewCounter( dnsSDLookupsCount = prometheus.NewCounter(
@ -52,65 +58,70 @@ func init() {
prometheus.MustRegister(dnsSDLookupsCount) prometheus.MustRegister(dnsSDLookupsCount)
} }
// TargetProvider encapsulates retrieving all targets for a job. // DNSDiscovery periodically performs DNS-SD requests. It implements
type TargetProvider interface { // the TargetProvider interface.
// Retrieves the current list of targets for this provider. type DNSDiscovery struct {
Targets() ([]Target, error) name string
done chan struct{}
ticker *time.Ticker
m sync.RWMutex
} }
type sdTargetProvider struct { // NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets.
job config.JobConfig func NewDNSDiscovery(name string, refreshInterval time.Duration) *DNSDiscovery {
globalLabels clientmodel.LabelSet return &DNSDiscovery{
targets []Target name: name,
done: make(chan struct{}),
lastRefresh time.Time ticker: time.NewTicker(refreshInterval),
refreshInterval time.Duration
}
// NewSdTargetProvider constructs a new sdTargetProvider for a job.
func NewSdTargetProvider(job config.JobConfig, globalLabels clientmodel.LabelSet) *sdTargetProvider {
i, err := utility.StringToDuration(job.GetSdRefreshInterval())
if err != nil {
panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err))
}
return &sdTargetProvider{
job: job,
globalLabels: globalLabels,
refreshInterval: i,
} }
} }
func (p *sdTargetProvider) Targets() ([]Target, error) { // Run implements the TargetProvider interface.
var err error func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup) {
defer func() { defer close(ch)
// Get an initial set right away.
if err := dd.refresh(ch); err != nil {
glog.Errorf("Error refreshing DNS targets: %s", err)
}
for {
select {
case <-dd.ticker.C:
if err := dd.refresh(ch); err != nil {
glog.Errorf("Error refreshing DNS targets: %s", err)
}
case <-dd.done:
return
}
}
}
// Stop implements the TargetProvider interface.
func (dd *DNSDiscovery) Stop() {
glog.V(1).Info("Stopping DNS discovery for %s...", dd.name)
dd.ticker.Stop()
dd.done <- struct{}{}
glog.V(1).Info("DNS discovery for %s stopped.", dd.name)
}
// Sources implements the TargetProvider interface.
func (dd *DNSDiscovery) Sources() []string {
return []string{dnsSourcePrefix + ":" + dd.name}
}
func (dd *DNSDiscovery) refresh(ch chan<- *config.TargetGroup) error {
response, err := lookupSRV(dd.name)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
dnsSDLookupFailuresCount.Inc() dnsSDLookupFailuresCount.Inc()
} return err
}()
if time.Since(p.lastRefresh) < p.refreshInterval {
return p.targets, nil
} }
response, err := lookupSRV(p.job.GetSdName()) tg := &config.TargetGroup{}
if err != nil {
return nil, err
}
baseLabels := clientmodel.LabelSet{
clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()),
}
for n, v := range p.globalLabels {
baseLabels[n] = v
}
targets := make([]Target, 0, len(response.Answer))
endpoint := &url.URL{
Scheme: "http",
Path: p.job.GetMetricsPath(),
}
for _, record := range response.Answer { for _, record := range response.Answer {
addr, ok := record.(*dns.SRV) addr, ok := record.(*dns.SRV)
if !ok { if !ok {
@ -118,22 +129,24 @@ func (p *sdTargetProvider) Targets() ([]Target, error) {
continue continue
} }
// Remove the final dot from rooted DNS names to make them look more usual. // Remove the final dot from rooted DNS names to make them look more usual.
if addr.Target[len(addr.Target)-1] == '.' { addr.Target = strings.TrimRight(addr.Target, ".")
addr.Target = addr.Target[:len(addr.Target)-1]
} target := clientmodel.LabelValue(fmt.Sprintf("%s:%d", addr.Target, addr.Port))
endpoint.Host = fmt.Sprintf("%s:%d", addr.Target, addr.Port) tg.Targets = append(tg.Targets, clientmodel.LabelSet{
t := NewTarget(endpoint.String(), p.job.ScrapeTimeout(), baseLabels) clientmodel.AddressLabel: target,
targets = append(targets, t) })
} }
p.targets = targets tg.Source = dnsSourcePrefix + ":" + dd.name
return targets, nil ch <- tg
return nil
} }
func lookupSRV(name string) (*dns.Msg, error) { func lookupSRV(name string) (*dns.Msg, error) {
conf, err := dns.ClientConfigFromFile(resolvConf) conf, err := dns.ClientConfigFromFile(resolvConf)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't load resolv.conf: %s", err) return nil, fmt.Errorf("could not load resolv.conf: %s", err)
} }
client := &dns.Client{} client := &dns.Client{}
@ -143,20 +156,20 @@ func lookupSRV(name string) (*dns.Msg, error) {
servAddr := net.JoinHostPort(server, conf.Port) servAddr := net.JoinHostPort(server, conf.Port)
for _, suffix := range conf.Search { for _, suffix := range conf.Search {
response, err = lookup(name, dns.TypeSRV, client, servAddr, suffix, false) response, err = lookup(name, dns.TypeSRV, client, servAddr, suffix, false)
if err == nil { if err != nil {
glog.Warningf("resolving %s.%s failed: %s", name, suffix, err)
continue
}
if len(response.Answer) > 0 { if len(response.Answer) > 0 {
return response, nil return response, nil
} }
} else {
glog.Warningf("resolving %s.%s failed: %s", name, suffix, err)
}
} }
response, err = lookup(name, dns.TypeSRV, client, servAddr, "", false) response, err = lookup(name, dns.TypeSRV, client, servAddr, "", false)
if err == nil { if err == nil {
return response, nil return response, nil
} }
} }
return response, fmt.Errorf("couldn't resolve %s: No server responded", name) return response, fmt.Errorf("could not resolve %s: No server responded", name)
} }
func lookup(name string, queryType uint16, client *dns.Client, servAddr string, suffix string, edns bool) (*dns.Msg, error) { func lookup(name string, queryType uint16, client *dns.Client, servAddr string, suffix string, edns bool) (*dns.Msg, error) {
@ -179,7 +192,6 @@ func lookup(name string, queryType uint16, client *dns.Client, servAddr string,
if err != nil { if err != nil {
return nil, err return nil, err
} }
if msg.Id != response.Id { if msg.Id != response.Id {
return nil, fmt.Errorf("DNS ID mismatch, request: %d, response: %d", msg.Id, response.Id) return nil, fmt.Errorf("DNS ID mismatch, request: %d, response: %d", msg.Id, response.Id)
} }
@ -188,11 +200,9 @@ func lookup(name string, queryType uint16, client *dns.Client, servAddr string,
if client.Net == "tcp" { if client.Net == "tcp" {
return nil, fmt.Errorf("got truncated message on tcp") return nil, fmt.Errorf("got truncated message on tcp")
} }
if edns { // Truncated even though EDNS is used if edns { // Truncated even though EDNS is used
client.Net = "tcp" client.Net = "tcp"
} }
return lookup(name, queryType, client, servAddr, suffix, !edns) return lookup(name, queryType, client, servAddr, suffix, !edns)
} }

View File

@ -17,6 +17,8 @@ import (
"time" "time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
) )
type nopAppender struct{} type nopAppender struct{}
@ -38,3 +40,25 @@ type collectResultAppender struct {
func (a *collectResultAppender) Append(s *clientmodel.Sample) { func (a *collectResultAppender) Append(s *clientmodel.Sample) {
a.result = append(a.result, s) a.result = append(a.result, s)
} }
// fakeTargetProvider implements a TargetProvider and allows manual injection
// of TargetGroups through the update channel.
type fakeTargetProvider struct {
sources []string
update chan *config.TargetGroup
}
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
for tg := range tp.update {
ch <- tg
}
}
func (tp *fakeTargetProvider) Stop() {
close(tp.update)
}
func (tp *fakeTargetProvider) Sources() []string {
return tp.sources
}

View File

@ -1,25 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"testing"
)
func TestInterfaces(t *testing.T) {
var (
_ Target = &target{}
_ TargetManager = &targetManager{}
)
}

View File

@ -30,13 +30,12 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
const ( const (
// InstanceLabel is the label value used for the instance label.
InstanceLabel clientmodel.LabelName = "instance"
// ScrapeHealthMetricName is the metric name for the synthetic health // ScrapeHealthMetricName is the metric name for the synthetic health
// variable. // variable.
scrapeHealthMetricName clientmodel.LabelValue = "up" scrapeHealthMetricName clientmodel.LabelValue = "up"
@ -54,7 +53,7 @@ const (
var ( var (
errIngestChannelFull = errors.New("ingestion channel full") errIngestChannelFull = errors.New("ingestion channel full")
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"} localhostRepresentations = []string{"127.0.0.1", "localhost"}
targetIntervalLength = prometheus.NewSummaryVec( targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
@ -131,23 +130,16 @@ type Target interface {
// Return the target's base labels without job and instance label. That's // Return the target's base labels without job and instance label. That's
// useful for display purposes. // useful for display purposes.
BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet
// SetBaseLabelsFrom sets the target's base labels to the base labels // Start scraping the target in regular intervals.
// of the provided target. RunScraper(storage.SampleAppender)
SetBaseLabelsFrom(Target)
// Scrape target at the specified interval.
RunScraper(storage.SampleAppender, time.Duration)
// Stop scraping, synchronous. // Stop scraping, synchronous.
StopScraper() StopScraper()
// Update the target's state.
Update(config.JobConfig, clientmodel.LabelSet)
} }
// target is a Target that refers to a singular HTTP or HTTPS endpoint. // target is a Target that refers to a singular HTTP or HTTPS endpoint.
type target struct { type target struct {
// The current health state of the target.
state TargetState
// The last encountered scrape error, if any.
lastError error
// The last time a scrape was attempted.
lastScrape time.Time
// Closing scraperStopping signals that scraping should stop. // Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{} scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped. // Closing scraperStopped signals that scraping has been stopped.
@ -155,34 +147,67 @@ type target struct {
// Channel to buffer ingested samples. // Channel to buffer ingested samples.
ingestedSamples chan clientmodel.Samples ingestedSamples chan clientmodel.Samples
url string
// What is the deadline for the HTTP or HTTPS against this endpoint.
deadline time.Duration
// Any base labels that are added to this target and its metrics.
baseLabels clientmodel.LabelSet
// The HTTP client used to scrape the target's endpoint. // The HTTP client used to scrape the target's endpoint.
httpClient *http.Client httpClient *http.Client
// Mutex protects lastError, lastScrape, state, and baseLabels. // Mutex protects the members below.
sync.RWMutex sync.RWMutex
url *url.URL
// Any base labels that are added to this target and its metrics.
baseLabels clientmodel.LabelSet
// The current health state of the target.
state TargetState
// The last encountered scrape error, if any.
lastError error
// The last time a scrape was attempted.
lastScrape time.Time
// What is the deadline for the HTTP or HTTPS against this endpoint.
deadline time.Duration
// The time between two scrapes.
scrapeInterval time.Duration
} }
// NewTarget creates a reasonably configured target for querying. // NewTarget creates a reasonably configured target for querying.
func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { func NewTarget(address string, cfg config.JobConfig, baseLabels clientmodel.LabelSet) Target {
t := &target{ t := &target{
url: url, url: &url.URL{
deadline: deadline, Host: address,
httpClient: utility.NewDeadlineClient(deadline), },
scraperStopping: make(chan struct{}), scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}), scraperStopped: make(chan struct{}),
} }
t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} t.Update(cfg, baseLabels)
for baseLabel, baseValue := range baseLabels {
t.baseLabels[baseLabel] = baseValue
}
return t return t
} }
// Update overwrites settings in the target that are derived from the job config
// it belongs to.
func (t *target) Update(cfg config.JobConfig, baseLabels clientmodel.LabelSet) {
t.Lock()
defer t.Unlock()
t.url.Scheme = cfg.GetScheme()
t.url.Path = cfg.GetMetricsPath()
t.scrapeInterval = cfg.ScrapeInterval()
t.deadline = cfg.ScrapeTimeout()
t.httpClient = utility.NewDeadlineClient(cfg.ScrapeTimeout())
t.baseLabels = clientmodel.LabelSet{
clientmodel.InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier()),
}
for name, val := range baseLabels {
t.baseLabels[name] = val
}
}
func (t *target) String() string {
t.RLock()
defer t.RUnlock()
return t.url.Host
}
// Ingest implements Target and extraction.Ingester. // Ingest implements Target and extraction.Ingester.
func (t *target) Ingest(s clientmodel.Samples) error { func (t *target) Ingest(s clientmodel.Samples) error {
// Since the regular case is that ingestedSamples is ready to receive, // Since the regular case is that ingestedSamples is ready to receive,
@ -202,10 +227,16 @@ func (t *target) Ingest(s clientmodel.Samples) error {
} }
// RunScraper implements Target. // RunScraper implements Target.
func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) { func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped) defer close(t.scraperStopped)
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) t.RLock()
lastScrapeInterval := t.scrapeInterval
t.RUnlock()
glog.V(1).Infof("Starting scraper for target %v...", t)
jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64()))
select { select {
case <-jitterTimer.C: case <-jitterTimer.C:
case <-t.scraperStopping: case <-t.scraperStopping:
@ -214,7 +245,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time
} }
jitterTimer.Stop() jitterTimer.Stop()
ticker := time.NewTicker(interval) ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop() defer ticker.Stop()
t.Lock() // Writing t.lastScrape requires the lock. t.Lock() // Writing t.lastScrape requires the lock.
@ -238,11 +269,21 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time
case <-t.scraperStopping: case <-t.scraperStopping:
return return
case <-ticker.C: case <-ticker.C:
t.Lock() // Write t.lastScrape requires locking. t.Lock()
took := time.Since(t.lastScrape) took := time.Since(t.lastScrape)
t.lastScrape = time.Now() t.lastScrape = time.Now()
intervalStr := lastScrapeInterval.String()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if lastScrapeInterval != t.scrapeInterval {
ticker = time.NewTicker(t.scrapeInterval)
lastScrapeInterval = t.scrapeInterval
}
t.Unlock() t.Unlock()
targetIntervalLength.WithLabelValues(interval.String()).Observe(
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision. float64(took) / float64(time.Second), // Sub-second precision.
) )
t.scrape(sampleAppender) t.scrape(sampleAppender)
@ -253,8 +294,12 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time
// StopScraper implements Target. // StopScraper implements Target.
func (t *target) StopScraper() { func (t *target) StopScraper() {
glog.V(1).Infof("Stopping scraper for target %v...", t)
close(t.scraperStopping) close(t.scraperStopping)
<-t.scraperStopped <-t.scraperStopped
glog.V(1).Infof("Scraper for target %v stopped.", t)
} }
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
@ -277,7 +322,7 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
t.Unlock() t.Unlock()
}(time.Now()) }(time.Now())
req, err := http.NewRequest("GET", t.URL(), nil) req, err := http.NewRequest("GET", t.url.String(), nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -339,41 +384,43 @@ func (t *target) LastScrape() time.Time {
// URL implements Target. // URL implements Target.
func (t *target) URL() string { func (t *target) URL() string {
return t.url t.RLock()
defer t.RUnlock()
return t.url.String()
} }
// InstanceIdentifier implements Target. // InstanceIdentifier implements Target.
func (t *target) InstanceIdentifier() string { func (t *target) InstanceIdentifier() string {
u, err := url.Parse(t.url)
if err != nil {
glog.Warningf("Could not parse instance URL when generating identifier, using raw URL: %s", err)
return t.url
}
// If we are given a port in the host port, use that. // If we are given a port in the host port, use that.
if strings.Contains(u.Host, ":") { if strings.Contains(t.url.Host, ":") {
return u.Host return t.url.Host
}
// Otherwise, deduce port based on protocol.
if u.Scheme == "http" {
return fmt.Sprintf("%s:80", u.Host)
} else if u.Scheme == "https" {
return fmt.Sprintf("%s:443", u.Host)
} }
glog.Warningf("Unknown scheme %s when generating identifier, using raw URL.", u.Scheme) t.RLock()
return t.url defer t.RUnlock()
// Otherwise, deduce port based on protocol.
if t.url.Scheme == "http" {
return fmt.Sprintf("%s:80", t.url.Host)
} else if t.url.Scheme == "https" {
return fmt.Sprintf("%s:443", t.url.Host)
}
glog.Warningf("Unknown scheme %s when generating identifier, using host without port number.", t.url.Scheme)
return t.url.Host
} }
// GlobalURL implements Target. // GlobalURL implements Target.
func (t *target) GlobalURL() string { func (t *target) GlobalURL() string {
url := t.url url := t.URL()
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err) glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err)
return url return url
} }
for _, localhostRepresentation := range localhostRepresentations { for _, localhostRepresentation := range localhostRepresentations {
url = strings.Replace(url, localhostRepresentation, fmt.Sprintf("http://%s", hostname), -1) url = strings.Replace(url, "//"+localhostRepresentation, "//"+hostname, 1)
} }
return url return url
} }
@ -389,23 +436,13 @@ func (t *target) BaseLabels() clientmodel.LabelSet {
func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet { func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet {
ls := clientmodel.LabelSet{} ls := clientmodel.LabelSet{}
for ln, lv := range t.BaseLabels() { for ln, lv := range t.BaseLabels() {
if ln != clientmodel.JobLabel && ln != InstanceLabel { if ln != clientmodel.JobLabel && ln != clientmodel.InstanceLabel {
ls[ln] = lv ls[ln] = lv
} }
} }
return ls return ls
} }
// SetBaseLabelsFrom implements Target.
func (t *target) SetBaseLabelsFrom(newTarget Target) {
if t.URL() != newTarget.URL() {
panic("targets don't refer to the same endpoint")
}
t.Lock()
defer t.Unlock()
t.baseLabels = newTarget.BaseLabels()
}
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {
healthMetric := clientmodel.Metric{} healthMetric := clientmodel.Metric{}
durationMetric := clientmodel.Metric{} durationMetric := clientmodel.Metric{}

View File

@ -18,56 +18,46 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"reflect" "reflect"
"strings"
"testing" "testing"
"time" "time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/golang/protobuf/proto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
func TestTargetInterface(t *testing.T) {
var _ Target = &target{}
}
func TestBaseLabels(t *testing.T) { func TestBaseLabels(t *testing.T) {
target := NewTarget("http://example.com/metrics", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) target := newTestTarget("example.com", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"})
want := clientmodel.LabelSet{"job": "some_job", "foo": "bar", "instance": "example.com:80"} want := clientmodel.LabelSet{
clientmodel.JobLabel: "some_job",
clientmodel.InstanceLabel: "example.com:80",
"foo": "bar",
}
got := target.BaseLabels() got := target.BaseLabels()
if !reflect.DeepEqual(want, got) { if !reflect.DeepEqual(want, got) {
t.Errorf("want base labels %v, got %v", want, got) t.Errorf("want base labels %v, got %v", want, got)
} }
delete(want, "job") delete(want, clientmodel.JobLabel)
delete(want, "instance") delete(want, clientmodel.InstanceLabel)
got = target.BaseLabelsWithoutJobAndInstance() got = target.BaseLabelsWithoutJobAndInstance()
if !reflect.DeepEqual(want, got) { if !reflect.DeepEqual(want, got) {
t.Errorf("want base labels %v, got %v", want, got) t.Errorf("want base labels %v, got %v", want, got)
} }
} }
func TestTargetHidesURLAuth(t *testing.T) {
testVectors := []string{"http://secret:data@host.com/query?args#fragment", "https://example.net/foo", "http://foo.com:31337/bar"}
testResults := []string{"host.com:80", "example.net:443", "foo.com:31337"}
if len(testVectors) != len(testResults) {
t.Errorf("Test vector length does not match test result length.")
}
for i := 0; i < len(testVectors); i++ {
testTarget := target{
state: Unknown,
url: testVectors[i],
httpClient: utility.NewDeadlineClient(0),
}
u := testTarget.InstanceIdentifier()
if u != testResults[i] {
t.Errorf("Expected InstanceIdentifier to be %v, actual %v", testResults[i], u)
}
}
}
func TestTargetScrapeUpdatesState(t *testing.T) { func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := target{ testTarget := newTestTarget("bad schema", 0, nil)
state: Unknown,
url: "bad schema",
httpClient: utility.NewDeadlineClient(0),
}
testTarget.scrape(nopAppender{}) testTarget.scrape(nopAppender{})
if testTarget.state != Unhealthy { if testTarget.state != Unhealthy {
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
@ -89,11 +79,7 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
) )
defer server.Close() defer server.Close()
testTarget := NewTarget( testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"})
server.URL,
10*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
).(*target)
testTarget.scrape(slowAppender{}) testTarget.scrape(slowAppender{})
if testTarget.state != Unhealthy { if testTarget.state != Unhealthy {
@ -105,9 +91,10 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
} }
func TestTargetRecordScrapeHealth(t *testing.T) { func TestTargetRecordScrapeHealth(t *testing.T) {
testTarget := NewTarget( jcfg := config.JobConfig{}
"http://example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, proto.SetDefaults(&jcfg.JobConfig)
).(*target)
testTarget := newTestTarget("example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"})
now := clientmodel.Now() now := clientmodel.Now()
appender := &collectResultAppender{} appender := &collectResultAppender{}
@ -123,7 +110,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
expected := &clientmodel.Sample{ expected := &clientmodel.Sample{
Metric: clientmodel.Metric{ Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: scrapeHealthMetricName, clientmodel.MetricNameLabel: scrapeHealthMetricName,
InstanceLabel: "example.url:80", clientmodel.InstanceLabel: "example.url:80",
clientmodel.JobLabel: "testjob", clientmodel.JobLabel: "testjob",
}, },
Timestamp: now, Timestamp: now,
@ -138,7 +125,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
expected = &clientmodel.Sample{ expected = &clientmodel.Sample{
Metric: clientmodel.Metric{ Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: scrapeDurationMetricName, clientmodel.MetricNameLabel: scrapeDurationMetricName,
InstanceLabel: "example.url:80", clientmodel.InstanceLabel: "example.url:80",
clientmodel.JobLabel: "testjob", clientmodel.JobLabel: "testjob",
}, },
Timestamp: now, Timestamp: now,
@ -163,7 +150,11 @@ func TestTargetScrapeTimeout(t *testing.T) {
) )
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) jcfg := config.JobConfig{}
proto.SetDefaults(&jcfg.JobConfig)
var testTarget Target = newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
appender := nopAppender{} appender := nopAppender{}
// scrape once without timeout // scrape once without timeout
@ -205,25 +196,20 @@ func TestTargetScrape404(t *testing.T) {
) )
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
appender := nopAppender{} appender := nopAppender{}
want := errors.New("server returned HTTP status 404 Not Found") want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.(*target).scrape(appender) got := testTarget.scrape(appender)
if got == nil || want.Error() != got.Error() { if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got) t.Fatalf("want err %q, got %q", want, got)
} }
} }
func TestTargetRunScraperScrapes(t *testing.T) { func TestTargetRunScraperScrapes(t *testing.T) {
testTarget := target{ testTarget := newTestTarget("bad schema", 0, nil)
state: Unknown,
url: "bad schema", go testTarget.RunScraper(nopAppender{})
httpClient: utility.NewDeadlineClient(0),
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
go testTarget.RunScraper(nopAppender{}, time.Duration(time.Millisecond))
// Enough time for a scrape to happen. // Enough time for a scrape to happen.
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
@ -253,11 +239,7 @@ func BenchmarkScrape(b *testing.B) {
) )
defer server.Close() defer server.Close()
testTarget := NewTarget( var testTarget Target = newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"})
server.URL,
100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
)
appender := nopAppender{} appender := nopAppender{}
b.ResetTimer() b.ResetTimer()
@ -267,3 +249,25 @@ func BenchmarkScrape(b *testing.B) {
} }
} }
} }
func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *target {
t := &target{
url: &url.URL{
Scheme: "http",
Host: strings.TrimLeft(targetURL, "http://"),
Path: "/metrics",
},
deadline: deadline,
scrapeInterval: 1 * time.Millisecond,
httpClient: utility.NewDeadlineClient(deadline),
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
t.baseLabels = clientmodel.LabelSet{
clientmodel.InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier()),
}
for baseLabel, baseValue := range baseLabels {
t.baseLabels[baseLabel] = baseValue
}
return t
}

View File

@ -14,6 +14,8 @@
package retrieval package retrieval
import ( import (
"fmt"
"strings"
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
@ -21,132 +23,385 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/retrieval/discovery"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
// TargetManager manages all scrape targets. All methods are goroutine-safe. // A TargetProvider provides information about target groups. It maintains a set
type TargetManager interface { // of sources from which TargetGroups can originate. Whenever a target provider
AddTarget(job config.JobConfig, t Target) // detects a potential change it sends the TargetGroup through its provided channel.
ReplaceTargets(job config.JobConfig, newTargets []Target) //
Remove(t Target) // The TargetProvider does not have to guarantee that an actual change happened.
AddTargetsFromConfig(config config.Config) // It does guarantee that it sends the new TargetGroup whenever a change happens.
// On startup it sends all TargetGroups it can see.
type TargetProvider interface {
// Sources returns the source identifiers the provider is currently aware of.
Sources() []string
// Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider
// if no more updates will be sent.
Run(chan<- *config.TargetGroup)
// Stop terminates any potential computation of the target provider. The
// channel received on Run must be closed afterwards.
Stop() Stop()
Pools() map[string]*TargetPool // Returns a copy of the name -> TargetPool mapping.
} }
type targetManager struct { // TargetManager maintains a set of targets, starts and stops their scraping and
sync.Mutex // Protects poolByJob. // creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
m sync.RWMutex
globalLabels clientmodel.LabelSet globalLabels clientmodel.LabelSet
sampleAppender storage.SampleAppender sampleAppender storage.SampleAppender
poolsByJob map[string]*TargetPool running bool
// Targets by their source ID.
targets map[string][]Target
// Providers and configs by their job name.
// TODO(fabxc): turn this into map[*ScrapeConfig][]TargetProvider eventually.
providers map[string][]TargetProvider
configs map[string]config.JobConfig
} }
// NewTargetManager returns a newly initialized TargetManager ready to use. // NewTargetManager creates a new TargetManager based on the given config.
func NewTargetManager(sampleAppender storage.SampleAppender, globalLabels clientmodel.LabelSet) TargetManager { func NewTargetManager(cfg config.Config, sampleAppender storage.SampleAppender) (*TargetManager, error) {
return &targetManager{ tm := &TargetManager{
sampleAppender: sampleAppender, sampleAppender: sampleAppender,
globalLabels: globalLabels, targets: make(map[string][]Target),
poolsByJob: make(map[string]*TargetPool),
} }
if err := tm.applyConfig(cfg); err != nil {
return nil, err
}
return tm, nil
} }
func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool { // Run starts background processing to handle target updates.
targetPool, ok := m.poolsByJob[job.GetName()] func (tm *TargetManager) Run() {
glog.Info("Starting target manager...")
if !ok { sources := map[string]struct{}{}
var provider TargetProvider
if job.SdName != nil { for name, provs := range tm.providers {
provider = NewSdTargetProvider(job, m.globalLabels) for _, p := range provs {
jcfg := tm.configs[name]
ch := make(chan *config.TargetGroup)
go tm.handleTargetUpdates(tm.configs[name], ch)
for _, src := range p.Sources() {
src = fullSource(jcfg, src)
sources[src] = struct{}{}
} }
interval := job.ScrapeInterval() // Run the target provider after cleanup of the stale targets is done.
targetPool = NewTargetPool(provider, m.sampleAppender, interval) defer func(c chan *config.TargetGroup) {
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) go p.Run(c)
}(ch)
m.poolsByJob[job.GetName()] = targetPool }
go targetPool.Run()
} }
return targetPool tm.removeTargets(func(src string) bool {
if _, ok := sources[src]; ok {
return false
}
return true
})
tm.running = true
} }
func (m *targetManager) AddTarget(job config.JobConfig, t Target) { // handleTargetUpdates receives target group updates and handles them in the
m.Lock() // context of the given job config.
defer m.Unlock() func (tm *TargetManager) handleTargetUpdates(cfg config.JobConfig, ch <-chan *config.TargetGroup) {
for tg := range ch {
glog.V(1).Infof("Received potential update for target group %q", tg.Source)
targetPool := m.targetPoolForJob(job) if err := tm.updateTargetGroup(tg, cfg); err != nil {
targetPool.AddTarget(t) glog.Errorf("Error updating targets: %s", err)
m.poolsByJob[job.GetName()] = targetPool
}
func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) {
m.Lock()
defer m.Unlock()
targetPool := m.targetPoolForJob(job)
targetPool.ReplaceTargets(newTargets)
}
func (m *targetManager) Remove(t Target) {
panic("not implemented")
}
func (m *targetManager) AddTargetsFromConfig(config config.Config) {
for _, job := range config.Jobs() {
if job.SdName != nil {
m.Lock()
m.targetPoolForJob(job)
m.Unlock()
continue
}
for _, targetGroup := range job.TargetGroup {
baseLabels := clientmodel.LabelSet{
clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()),
}
for n, v := range m.globalLabels {
baseLabels[n] = v
}
if targetGroup.Labels != nil {
for _, label := range targetGroup.Labels.Label {
baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())
}
}
for _, endpoint := range targetGroup.Target {
target := NewTarget(endpoint, job.ScrapeTimeout(), baseLabels)
m.AddTarget(job, target)
}
} }
} }
} }
func (m *targetManager) Stop() { // fullSource prepends the unique job name to the source.
m.Lock() //
defer m.Unlock() // Thus, oscilliating label sets for targets with the same source,
// but providers from different configs, are prevented.
func fullSource(cfg config.JobConfig, src string) string {
return cfg.GetName() + ":" + src
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.stop(true)
}
// stop background processing of the target manager. If removeTargets is true,
// existing targets will be stopped and removed.
func (tm *TargetManager) stop(removeTargets bool) {
tm.m.Lock()
defer tm.m.Unlock()
if !tm.running {
return
}
glog.Info("Stopping target manager...") glog.Info("Stopping target manager...")
var wg sync.WaitGroup defer glog.Info("Target manager stopped.")
for j, p := range m.poolsByJob {
wg.Add(1) for _, provs := range tm.providers {
go func(j string, p *TargetPool) { for _, p := range provs {
defer wg.Done()
glog.Infof("Stopping target pool %q...", j)
p.Stop() p.Stop()
glog.Infof("Target pool %q stopped.", j) }
}(j, p) }
if removeTargets {
tm.removeTargets(nil)
}
tm.running = false
}
// removeTargets stops and removes targets for sources where f(source) is true
// or if f is nil. This method is not thread-safe.
func (tm *TargetManager) removeTargets(f func(string) bool) {
if f == nil {
f = func(string) bool { return true }
}
var wg sync.WaitGroup
for src, targets := range tm.targets {
if !f(src) {
continue
}
wg.Add(len(targets))
for _, target := range targets {
go func(t Target) {
t.StopScraper()
wg.Done()
}(target)
}
delete(tm.targets, src)
} }
wg.Wait() wg.Wait()
glog.Info("Target manager stopped.")
} }
func (m *targetManager) Pools() map[string]*TargetPool { // updateTargetGroup creates new targets for the group and replaces the old targets
m.Lock() // for the source ID.
defer m.Unlock() func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg config.JobConfig) error {
newTargets, err := tm.targetsFromGroup(tgroup, cfg)
result := make(map[string]*TargetPool, len(m.poolsByJob)) if err != nil {
for k, v := range m.poolsByJob { return err
result[k] = v
} }
return result src := fullSource(cfg, tgroup.Source)
tm.m.Lock()
defer tm.m.Unlock()
oldTargets, ok := tm.targets[src]
if ok {
var wg sync.WaitGroup
// Replace the old targets with the new ones while keeping the state
// of intersecting targets.
for i, tnew := range newTargets {
var match Target
for j, told := range oldTargets {
if told == nil {
continue
}
if tnew.InstanceIdentifier() == told.InstanceIdentifier() {
match = told
oldTargets[j] = nil
break
}
}
// Update the exisiting target and discard the new equivalent.
// Otherwise start scraping the new target.
if match != nil {
// Updating is blocked during a scrape. We don't want those wait times
// to build up.
wg.Add(1)
go func(t Target) {
match.Update(cfg, t.BaseLabels())
wg.Done()
}(tnew)
newTargets[i] = match
} else {
go tnew.RunScraper(tm.sampleAppender)
}
}
// Remove all old targets that disappeared.
for _, told := range oldTargets {
if told != nil {
wg.Add(1)
go func(t Target) {
t.StopScraper()
wg.Done()
}(told)
}
}
wg.Wait()
} else {
// The source ID is new, start all target scrapers.
for _, tnew := range newTargets {
go tnew.RunScraper(tm.sampleAppender)
}
}
if len(newTargets) > 0 {
tm.targets[src] = newTargets
} else {
delete(tm.targets, src)
}
return nil
}
// Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]Target {
tm.m.RLock()
defer tm.m.RUnlock()
pools := map[string][]Target{}
for _, ts := range tm.targets {
for _, t := range ts {
job := string(t.BaseLabels()[clientmodel.JobLabel])
pools[job] = append(pools[job], t)
}
}
return pools
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg config.Config) error {
tm.stop(false)
// Even if updating the config failed, we want to continue rather than stop scraping anything.
defer tm.Run()
if err := tm.applyConfig(cfg); err != nil {
glog.Warningf("Error updating config, changes not applied: %s", err)
return err
}
return nil
}
func (tm *TargetManager) applyConfig(cfg config.Config) error {
// Only apply changes if everything was successful.
providers := map[string][]TargetProvider{}
configs := map[string]config.JobConfig{}
for _, jcfg := range cfg.Jobs() {
provs, err := ProvidersFromConfig(jcfg)
if err != nil {
return err
}
configs[jcfg.GetName()] = jcfg
providers[jcfg.GetName()] = provs
}
tm.m.Lock()
defer tm.m.Unlock()
tm.globalLabels = cfg.GlobalLabels()
tm.providers = providers
tm.configs = configs
return nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg config.JobConfig) ([]Target, error) {
tm.m.RLock()
defer tm.m.RUnlock()
targets := make([]Target, 0, len(tg.Targets))
for i, labels := range tg.Targets {
for ln, lv := range tg.Labels {
if _, ok := labels[ln]; !ok {
labels[ln] = lv
}
}
for ln, lv := range tm.globalLabels {
if _, ok := labels[ln]; !ok {
labels[ln] = lv
}
}
address, ok := labels[clientmodel.AddressLabel]
if !ok {
return nil, fmt.Errorf("Instance %d in target group %s has no address", i, tg)
}
if _, ok := labels[clientmodel.JobLabel]; !ok {
labels[clientmodel.JobLabel] = clientmodel.LabelValue(cfg.GetName())
}
for ln := range labels {
// There are currently no internal labels we want to take over to time series.
if strings.HasPrefix(string(ln), clientmodel.ReservedLabelPrefix) {
delete(labels, ln)
}
}
targets = append(targets, NewTarget(string(address), cfg, labels))
}
return targets, nil
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.JobConfig) ([]TargetProvider, error) {
var providers []TargetProvider
if name := cfg.GetSdName(); name != "" {
dnsSD := discovery.NewDNSDiscovery(name, cfg.SDRefreshInterval())
providers = append(providers, dnsSD)
}
if tgs := cfg.GetTargetGroup(); tgs != nil {
static := NewStaticProvider(tgs)
providers = append(providers, static)
}
return providers, nil
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
}
// NewStaticProvider returns a StaticProvider configured with the given
// target groups.
func NewStaticProvider(groups []*pb.TargetGroup) *StaticProvider {
prov := &StaticProvider{}
for i, tg := range groups {
g := &config.TargetGroup{
Source: fmt.Sprintf("static:%d", i),
Labels: clientmodel.LabelSet{},
}
for _, pair := range tg.GetLabels().GetLabel() {
g.Labels[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue())
}
for _, t := range tg.GetTarget() {
g.Targets = append(g.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(t),
})
}
prov.TargetGroups = append(prov.TargetGroups, g)
}
return prov
}
// Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) {
for _, tg := range sd.TargetGroups {
ch <- tg
}
close(ch) // This provider never sends any updates.
}
// Stop implements the TargetProvider interface.
func (sd *StaticProvider) Stop() {}
// TargetGroups returns the provider's target groups.
func (sd *StaticProvider) Sources() (srcs []string) {
for _, tg := range sd.TargetGroups {
srcs = append(srcs, tg.Source)
}
return srcs
} }

View File

@ -14,6 +14,7 @@
package retrieval package retrieval
import ( import (
"reflect"
"testing" "testing"
"time" "time"
@ -21,110 +22,247 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
pb "github.com/prometheus/prometheus/config/generated"
) )
type fakeTarget struct { func TestTargetManagerChan(t *testing.T) {
scrapeCount int testJob1 := pb.JobConfig{
lastScrape time.Time
interval time.Duration
}
func (t fakeTarget) LastError() error {
return nil
}
func (t fakeTarget) URL() string {
return "fake"
}
func (t fakeTarget) InstanceIdentifier() string {
return "fake"
}
func (t fakeTarget) GlobalURL() string {
return t.URL()
}
func (t fakeTarget) BaseLabels() clientmodel.LabelSet {
return clientmodel.LabelSet{}
}
func (t fakeTarget) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet {
return clientmodel.LabelSet{}
}
func (t fakeTarget) Interval() time.Duration {
return t.interval
}
func (t fakeTarget) LastScrape() time.Time {
return t.lastScrape
}
func (t fakeTarget) scrape(storage.SampleAppender) error {
t.scrapeCount++
return nil
}
func (t fakeTarget) RunScraper(storage.SampleAppender, time.Duration) {
return
}
func (t fakeTarget) StopScraper() {
return
}
func (t fakeTarget) State() TargetState {
return Healthy
}
func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {}
func (t *fakeTarget) Ingest(clientmodel.Samples) error { return nil }
func testTargetManager(t testing.TB) {
targetManager := NewTargetManager(nopAppender{}, nil)
testJob1 := config.JobConfig{
JobConfig: pb.JobConfig{
Name: proto.String("test_job1"), Name: proto.String("test_job1"),
ScrapeInterval: proto.String("1m"), ScrapeInterval: proto.String("1m"),
TargetGroup: []*pb.TargetGroup{
{Target: []string{"example.org:80", "example.com:80"}},
}, },
} }
testJob2 := config.JobConfig{ prov1 := &fakeTargetProvider{
JobConfig: pb.JobConfig{ sources: []string{"src1", "src2"},
update: make(chan *config.TargetGroup),
}
targetManager := &TargetManager{
sampleAppender: nopAppender{},
providers: map[string][]TargetProvider{
*testJob1.Name: []TargetProvider{prov1},
},
configs: map[string]config.JobConfig{
*testJob1.Name: config.JobConfig{testJob1},
},
targets: make(map[string][]Target),
}
go targetManager.Run()
defer targetManager.Stop()
sequence := []struct {
tgroup *config.TargetGroup
expected map[string][]clientmodel.LabelSet
}{
{
tgroup: &config.TargetGroup{
Source: "src1",
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1234"},
{clientmodel.AddressLabel: "test-2:1234", "label": "set"},
{clientmodel.AddressLabel: "test-3:1234"},
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src2",
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1235"},
{clientmodel.AddressLabel: "test-2:1235"},
{clientmodel.AddressLabel: "test-3:1235"},
},
Labels: clientmodel.LabelSet{"group": "label"},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
},
"test_job1:src2": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1235", "group": "label"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src2",
Targets: []clientmodel.LabelSet{},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src1",
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1234", "added": "label"},
{clientmodel.AddressLabel: "test-3:1234"},
{clientmodel.AddressLabel: "test-4:1234", "fancy": "label"},
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234", "added": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-4:1234", "fancy": "label"},
},
},
},
}
for i, step := range sequence {
prov1.update <- step.tgroup
<-time.After(1 * time.Millisecond)
if len(targetManager.targets) != len(step.expected) {
t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected)
}
for source, actTargets := range targetManager.targets {
expTargets, ok := step.expected[source]
if !ok {
t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets)
}
for _, expt := range expTargets {
found := false
for _, actt := range actTargets {
if reflect.DeepEqual(expt, actt.BaseLabels()) {
found = true
break
}
}
if !found {
t.Errorf("step %d: expected target %v not found in actual targets", i, expt)
}
}
}
}
}
func TestTargetManagerConfigUpdate(t *testing.T) {
testJob1 := &pb.JobConfig{
Name: proto.String("test_job1"),
ScrapeInterval: proto.String("1m"),
TargetGroup: []*pb.TargetGroup{
{Target: []string{"example.org:80", "example.com:80"}},
},
}
testJob2 := &pb.JobConfig{
Name: proto.String("test_job2"), Name: proto.String("test_job2"),
ScrapeInterval: proto.String("1m"), ScrapeInterval: proto.String("1m"),
TargetGroup: []*pb.TargetGroup{
{Target: []string{"example.org:8080", "example.com:8081"}},
{Target: []string{"test.com:1234"}},
}, },
} }
target1GroupA := &fakeTarget{ sequence := []struct {
interval: time.Minute, jobConfigs []*pb.JobConfig
} expected map[string][]clientmodel.LabelSet
target2GroupA := &fakeTarget{ }{
interval: time.Minute, {
jobConfigs: []*pb.JobConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"},
},
},
}, {
jobConfigs: []*pb.JobConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"},
},
},
}, {
jobConfigs: []*pb.JobConfig{testJob1, testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"},
},
"test_job2:static:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081"},
},
"test_job2:static:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "test.com:1234"},
},
},
}, {
jobConfigs: []*pb.JobConfig{},
expected: map[string][]clientmodel.LabelSet{},
}, {
jobConfigs: []*pb.JobConfig{testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job2:static:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081"},
},
"test_job2:static:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "test.com:1234"},
},
},
},
} }
targetManager.AddTarget(testJob1, target1GroupA) targetManager, err := NewTargetManager(config.Config{}, nopAppender{})
targetManager.AddTarget(testJob1, target2GroupA) if err != nil {
t.Fatal(err)
}
targetManager.Run()
defer targetManager.Stop()
target1GroupB := &fakeTarget{ for i, step := range sequence {
interval: time.Minute * 2, cfg := pb.PrometheusConfig{
Job: step.jobConfigs,
}
err := targetManager.ApplyConfig(config.Config{cfg})
if err != nil {
t.Fatal(err)
} }
targetManager.AddTarget(testJob2, target1GroupB) <-time.After(1 * time.Millisecond)
}
func TestTargetManager(t *testing.T) { if len(targetManager.targets) != len(step.expected) {
testTargetManager(t) t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected)
} }
func BenchmarkTargetManager(b *testing.B) { for source, actTargets := range targetManager.targets {
for i := 0; i < b.N; i++ { expTargets, ok := step.expected[source]
testTargetManager(b) if !ok {
t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets)
}
for _, expt := range expTargets {
found := false
for _, actt := range actTargets {
if reflect.DeepEqual(expt, actt.BaseLabels()) {
found = true
break
}
}
if !found {
t.Errorf("step %d: expected target %v for %q not found in actual targets", i, expt, source)
}
}
}
} }
} }

View File

@ -1,164 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"sort"
"sync"
"time"
"github.com/golang/glog"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility"
)
const (
targetAddQueueSize = 100
targetReplaceQueueSize = 1
)
// TargetPool is a pool of targets for the same job.
type TargetPool struct {
sync.RWMutex
manager TargetManager
targetsByURL map[string]Target
interval time.Duration
sampleAppender storage.SampleAppender
addTargetQueue chan Target
targetProvider TargetProvider
stopping, stopped chan struct{}
}
// NewTargetPool creates a TargetPool, ready to be started by calling Run.
func NewTargetPool(p TargetProvider, app storage.SampleAppender, i time.Duration) *TargetPool {
return &TargetPool{
interval: i,
sampleAppender: app,
targetsByURL: make(map[string]Target),
addTargetQueue: make(chan Target, targetAddQueueSize),
targetProvider: p,
stopping: make(chan struct{}),
stopped: make(chan struct{}),
}
}
// Run starts the target pool. It returns when the target pool has stopped
// (after calling Stop). Run is usually called as a goroutine.
func (p *TargetPool) Run() {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if p.targetProvider != nil {
targets, err := p.targetProvider.Targets()
if err != nil {
glog.Warningf("Error looking up targets, keeping old list: %s", err)
} else {
p.ReplaceTargets(targets)
}
}
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case <-p.stopping:
p.ReplaceTargets([]Target{})
close(p.stopped)
return
}
}
}
// Stop stops the target pool and returns once the shutdown is complete.
func (p *TargetPool) Stop() {
close(p.stopping)
<-p.stopped
}
// AddTarget adds a target by queuing it in the target queue.
func (p *TargetPool) AddTarget(target Target) {
p.addTargetQueue <- target
}
func (p *TargetPool) addTarget(target Target) {
p.Lock()
defer p.Unlock()
p.targetsByURL[target.URL()] = target
go target.RunScraper(p.sampleAppender, p.interval)
}
// ReplaceTargets replaces the old targets by the provided new ones but reuses
// old targets that are also present in newTargets to preserve scheduling and
// health state. Targets no longer present are stopped.
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
newTargetURLs := make(utility.Set)
for _, newTarget := range newTargets {
newTargetURLs.Add(newTarget.URL())
oldTarget, ok := p.targetsByURL[newTarget.URL()]
if ok {
oldTarget.SetBaseLabelsFrom(newTarget)
} else {
p.targetsByURL[newTarget.URL()] = newTarget
go newTarget.RunScraper(p.sampleAppender, p.interval)
}
}
var wg sync.WaitGroup
for k, oldTarget := range p.targetsByURL {
if !newTargetURLs.Has(k) {
wg.Add(1)
go func(k string, oldTarget Target) {
defer wg.Done()
glog.V(1).Infof("Stopping scraper for target %s...", k)
oldTarget.StopScraper()
glog.V(1).Infof("Scraper for target %s stopped.", k)
}(k, oldTarget)
delete(p.targetsByURL, k)
}
}
wg.Wait()
}
type targetsByURL []Target
func (s targetsByURL) Len() int {
return len(s)
}
func (s targetsByURL) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s targetsByURL) Less(i, j int) bool {
return s[i].URL() < s[j].URL()
}
// Targets returns a sorted copy of the current target list.
func (p *TargetPool) Targets() []Target {
p.RLock()
defer p.RUnlock()
targets := make(targetsByURL, 0, len(p.targetsByURL))
for _, v := range p.targetsByURL {
targets = append(targets, v)
}
sort.Sort(targets)
return targets
}

View File

@ -1,164 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"net/http"
"testing"
"time"
)
func testTargetPool(t testing.TB) {
type expectation struct {
size int
}
type input struct {
url string
scheduledFor time.Time
}
type output struct {
url string
}
var scenarios = []struct {
name string
inputs []input
outputs []output
}{
{
name: "empty",
inputs: []input{},
outputs: []output{},
},
{
name: "single element",
inputs: []input{
{
url: "single1",
},
},
outputs: []output{
{
url: "single1",
},
},
},
{
name: "plural schedules",
inputs: []input{
{
url: "plural1",
},
{
url: "plural2",
},
},
outputs: []output{
{
url: "plural1",
},
{
url: "plural2",
},
},
},
}
for i, scenario := range scenarios {
pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
for _, input := range scenario.inputs {
target := target{
url: input.url,
httpClient: &http.Client{},
}
pool.addTarget(&target)
}
if len(pool.targetsByURL) != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByURL))
} else {
for j, output := range scenario.outputs {
if target, ok := pool.targetsByURL[output.url]; !ok {
t.Errorf("%s %d.%d. expected Target url to be %s but was %s", scenario.name, i, j, output.url, target.URL())
}
}
if len(pool.targetsByURL) != len(scenario.outputs) {
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByURL))
}
}
}
}
func TestTargetPool(t *testing.T) {
testTargetPool(t)
}
func TestTargetPoolReplaceTargets(t *testing.T) {
pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
oldTarget1 := &target{
url: "example1",
state: Unhealthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
httpClient: &http.Client{},
}
oldTarget2 := &target{
url: "example2",
state: Unhealthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
httpClient: &http.Client{},
}
newTarget1 := &target{
url: "example1",
state: Healthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
httpClient: &http.Client{},
}
newTarget2 := &target{
url: "example3",
state: Healthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
httpClient: &http.Client{},
}
pool.addTarget(oldTarget1)
pool.addTarget(oldTarget2)
pool.ReplaceTargets([]Target{newTarget1, newTarget2})
if len(pool.targetsByURL) != 2 {
t.Errorf("Expected 2 elements in pool, had %d", len(pool.targetsByURL))
}
if pool.targetsByURL["example1"].State() != oldTarget1.State() {
t.Errorf("target1 channel has changed")
}
if pool.targetsByURL["example3"].State() == oldTarget2.State() {
t.Errorf("newTarget2 channel same as oldTarget2's")
}
}
func BenchmarkTargetPool(b *testing.B) {
for i := 0; i < b.N; i++ {
testTargetPool(b)
}
}

View File

@ -30,7 +30,7 @@ type PrometheusStatusHandler struct {
Config string Config string
Flags map[string]string Flags map[string]string
RuleManager manager.RuleManager RuleManager manager.RuleManager
TargetPools map[string]*retrieval.TargetPool TargetPools func() map[string][]retrieval.Target
Birth time.Time Birth time.Time
PathPrefix string PathPrefix string

View File

@ -33,7 +33,7 @@
<h2>Targets</h2> <h2>Targets</h2>
<table class="table table-condensed table-bordered table-striped table-hover"> <table class="table table-condensed table-bordered table-striped table-hover">
{{$stateToClass := .TargetStateToClass}} {{$stateToClass := .TargetStateToClass}}
{{range $job, $pool := .TargetPools}} {{range $job, $pool := call .TargetPools}}
<thead> <thead>
<tr><th colspan="5" class="job_header">{{$job}}</th></tr> <tr><th colspan="5" class="job_header">{{$job}}</th></tr>
<tr> <tr>
@ -45,7 +45,7 @@
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{{range $pool.Targets}} {{range $pool}}
<tr> <tr>
<td> <td>
<a href="{{.GlobalURL}}">{{.URL}}</a> <a href="{{.GlobalURL}}">{{.URL}}</a>