Merge pull request #1404 from prometheus/scraperef2

Retrieval refactoring
pull/1465/head
Fabian Reinartz 2016-03-06 22:17:00 +01:00
commit 5b9e85e556
18 changed files with 1527 additions and 2017 deletions

View File

@ -15,7 +15,6 @@ package discovery
import ( import (
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -24,6 +23,7 @@ import (
consul "github.com/hashicorp/consul/api" consul "github.com/hashicorp/consul/api"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -113,52 +113,24 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) {
return cd, nil return cd, nil
} }
// Sources implements the TargetProvider interface.
func (cd *ConsulDiscovery) Sources() []string {
clientConf := *cd.clientConf
clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second}
client, err := consul.NewClient(&clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err))
}
srvs, _, err := client.Catalog().Services(nil)
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
return nil
}
cd.mu.Lock()
defer cd.mu.Unlock()
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) == 0 || ok {
srcs = append(srcs, name)
}
}
return srcs
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
defer cd.stop() defer cd.stop()
update := make(chan *consulService, 10) update := make(chan *consulService, 10)
go cd.watchServices(update, done) go cd.watchServices(update, ctx.Done())
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case srv := <-update: case srv := <-update:
if srv.removed { if srv.removed {
close(srv.done) close(srv.done)
// Send clearing update. // Send clearing update.
ch <- config.TargetGroup{Source: srv.name} ch <- []*config.TargetGroup{{Source: srv.name}}
break break
} }
// Launch watcher for the service. // Launch watcher for the service.
@ -244,7 +216,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
// watchService retrieves updates about srv from Consul's service endpoint. // watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch. // On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) {
catalog := cd.client.Catalog() catalog := cd.client.Catalog()
for { for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
@ -288,7 +260,11 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.Tar
default: default:
// Continue. // Continue.
} }
ch <- srv.tgroup // TODO(fabxc): do a copy for now to avoid races. The integration
// needs needs some general cleanup.
tg := srv.tgroup
ch <- []*config.TargetGroup{&tg}
cd.mu.Unlock() cd.mu.Unlock()
} }
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -91,7 +92,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (dd *DNSDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(dd.interval) ticker := time.NewTicker(dd.interval)
@ -104,23 +105,15 @@ func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
select { select {
case <-ticker.C: case <-ticker.C:
dd.refreshAll(ch) dd.refreshAll(ch)
case <-done: case <-ctx.Done():
return return
} }
} }
} }
// Sources implements the TargetProvider interface. func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) {
func (dd *DNSDiscovery) Sources() []string {
var srcs []string
for _, name := range dd.names {
srcs = append(srcs, name)
}
return srcs
}
func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(dd.names)) wg.Add(len(dd.names))
for _, name := range dd.names { for _, name := range dd.names {
go func(n string) { go func(n string) {
@ -130,10 +123,11 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
wg.Done() wg.Done()
}(name) }(name)
} }
wg.Wait() wg.Wait()
} }
func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { func (dd *DNSDiscovery) refresh(name string, ch chan<- []*config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype) response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
@ -141,7 +135,8 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error
return err return err
} }
var tg config.TargetGroup tg := &config.TargetGroup{}
for _, record := range response.Answer { for _, record := range response.Answer {
target := model.LabelValue("") target := model.LabelValue("")
switch addr := record.(type) { switch addr := record.(type) {
@ -166,7 +161,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error
} }
tg.Source = name tg.Source = name
ch <- tg ch <- []*config.TargetGroup{tg}
return nil return nil
} }

View File

@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/aws/defaults" "github.com/aws/aws-sdk-go/aws/defaults"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -46,7 +47,6 @@ const (
// the TargetProvider interface. // the TargetProvider interface.
type EC2Discovery struct { type EC2Discovery struct {
aws *aws.Config aws *aws.Config
done chan struct{}
interval time.Duration interval time.Duration
port int port int
} }
@ -62,14 +62,13 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
Region: &conf.Region, Region: &conf.Region,
Credentials: creds, Credentials: creds,
}, },
done: make(chan struct{}),
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
port: conf.Port, port: conf.Port,
} }
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(ed.interval) ticker := time.NewTicker(ed.interval)
@ -80,7 +79,7 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- *tg ch <- []*config.TargetGroup{tg}
} }
for { for {
@ -90,19 +89,14 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- *tg ch <- []*config.TargetGroup{tg}
} }
case <-done: case <-ctx.Done():
return return
} }
} }
} }
// Sources implements the TargetProvider interface.
func (ed *EC2Discovery) Sources() []string {
return []string{*ed.aws.Region}
}
func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) { func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) {
ec2s := ec2.New(ed.aws) ec2s := ec2.New(ed.aws)
tg := &config.TargetGroup{ tg := &config.TargetGroup{

View File

@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"gopkg.in/fsnotify.v1" "gopkg.in/fsnotify.v1"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -53,23 +54,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
} }
} }
// Sources implements the TargetProvider interface.
func (fd *FileDiscovery) Sources() []string {
var srcs []string
// As we allow multiple target groups per file we have no choice
// but to parse them all.
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
log.Errorf("Error reading file %q: %s", p, err)
}
for _, tg := range tgroups {
srcs = append(srcs, tg.Source)
}
}
return srcs
}
// listFiles returns a list of all files that match the configured patterns. // listFiles returns a list of all files that match the configured patterns.
func (fd *FileDiscovery) listFiles() []string { func (fd *FileDiscovery) listFiles() []string {
var paths []string var paths []string
@ -103,7 +87,7 @@ func (fd *FileDiscovery) watchFiles() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
defer fd.stop() defer fd.stop()
@ -123,11 +107,11 @@ func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
// Stopping has priority over refreshing. Thus we wrap the actual select // Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals. // clause to always catch done signals.
select { select {
case <-done: case <-ctx.Done():
return return
default: default:
select { select {
case <-done: case <-ctx.Done():
return return
case event := <-fd.watcher.Events: case event := <-fd.watcher.Events:
@ -188,7 +172,7 @@ func (fd *FileDiscovery) stop() {
// refresh reads all files matching the discovery's patterns and sends the respective // refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel. // updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) {
ref := map[string]int{} ref := map[string]int{}
for _, p := range fd.listFiles() { for _, p := range fd.listFiles() {
tgroups, err := readFile(p) tgroups, err := readFile(p)
@ -198,9 +182,8 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
ref[p] = fd.lastRefresh[p] ref[p] = fd.lastRefresh[p]
continue continue
} }
for _, tg := range tgroups { ch <- tgroups
ch <- *tg
}
ref[p] = len(tgroups) ref[p] = len(tgroups)
} }
// Send empty updates for sources that disappeared. // Send empty updates for sources that disappeared.
@ -208,7 +191,9 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
for i := m; i < n; i++ { for i := m; i < n; i++ {
ch <- config.TargetGroup{Source: fileSource(f, i)} ch <- []*config.TargetGroup{
{Source: fileSource(f, i)},
}
} }
} }
} }

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -27,17 +28,17 @@ func testFileSD(t *testing.T, ext string) {
conf.RefreshInterval = model.Duration(1 * time.Hour) conf.RefreshInterval = model.Duration(1 * time.Hour)
var ( var (
fsd = NewFileDiscovery(&conf) fsd = NewFileDiscovery(&conf)
ch = make(chan config.TargetGroup) ch = make(chan []*config.TargetGroup)
done = make(chan struct{}) ctx, cancel = context.WithCancel(context.Background())
) )
go fsd.Run(ch, done) go fsd.Run(ctx, ch)
select { select {
case <-time.After(25 * time.Millisecond): case <-time.After(25 * time.Millisecond):
// Expected. // Expected.
case tg := <-ch: case tgs := <-ch:
t.Fatalf("Unexpected target group in file discovery: %s", tg) t.Fatalf("Unexpected target groups in file discovery: %s", tgs)
} }
newf, err := os.Create("fixtures/_test" + ext) newf, err := os.Create("fixtures/_test" + ext)
@ -58,37 +59,37 @@ func testFileSD(t *testing.T, ext string) {
} }
newf.Close() newf.Close()
// The files contain two target groups which are read and sent in order. // The files contain two target groups.
select { select {
case <-time.After(15 * time.Second): case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none") t.Fatalf("Expected new target group but got none")
case tg := <-ch: case tgs := <-ch:
tg := tgs[0]
if _, ok := tg.Labels["foo"]; !ok { if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed") t.Fatalf("Label not parsed")
} }
if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) { if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group %s", tg) t.Fatalf("Unexpected target group %s", tg)
} }
}
select { tg = tgs[1]
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) { if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg) t.Fatalf("Unexpected target groups %s", tg)
} }
} }
// Based on unknown circumstances, sometimes fsnotify will trigger more events in // Based on unknown circumstances, sometimes fsnotify will trigger more events in
// some runs (which might be empty, chains of different operations etc.). // some runs (which might be empty, chains of different operations etc.).
// We have to drain those (as the target manager would) to avoid deadlocking and must // We have to drain those (as the target manager would) to avoid deadlocking and must
// not try to make sense of it all... // not try to make sense of it all...
drained := make(chan struct{}) drained := make(chan struct{})
go func() { go func() {
for tg := range ch { for tgs := range ch {
// Below we will change the file to a bad syntax. Previously extracted target // Below we will change the file to a bad syntax. Previously extracted target
// groups must not be deleted via sending an empty target group. // groups must not be deleted via sending an empty target group.
if len(tg.Targets) == 0 { if len(tgs[0].Targets) == 0 {
t.Errorf("Unexpected empty target group received: %s", tg) t.Errorf("Unexpected empty target groups received: %s", tgs)
} }
} }
close(drained) close(drained)
@ -107,6 +108,6 @@ func testFileSD(t *testing.T, ext string) {
os.Rename(newf.Name(), "fixtures/_test"+ext) os.Rename(newf.Name(), "fixtures/_test"+ext)
close(done) cancel()
<-drained <-drained
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
@ -94,75 +95,35 @@ func (kd *Discovery) Initialize() error {
return nil return nil
} }
// Sources implements the TargetProvider interface.
func (kd *Discovery) Sources() []string {
sourceNames := make([]string, 0, len(kd.apiServers))
for _, apiServer := range kd.apiServers {
sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host)
}
nodes, _, err := kd.getNodes()
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to initialize Kubernetes nodes: %s", err)
return []string{}
}
sourceNames = append(sourceNames, kd.nodeSources(nodes)...)
services, _, err := kd.getServices()
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to initialize Kubernetes services: %s", err)
return []string{}
}
sourceNames = append(sourceNames, kd.serviceSources(services)...)
return sourceNames
}
func (kd *Discovery) nodeSources(nodes map[string]*Node) []string {
var sourceNames []string
for name := range nodes {
sourceNames = append(sourceNames, nodesTargetGroupName+":"+name)
}
return sourceNames
}
func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string {
var sourceNames []string
for _, ns := range services {
for _, service := range ns {
sourceNames = append(sourceNames, serviceSource(service))
}
}
return sourceNames
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
if tg := kd.updateAPIServersTargetGroup(); tg != nil { // Send an initial full view.
select { // TODO(fabxc): this does not include all available services and service
case ch <- *tg: // endpoints yet. Service endpoints were also missing in the previous Sources() method.
case <-done: var all []*config.TargetGroup
return
} all = append(all, kd.updateAPIServersTargetGroup())
all = append(all, kd.updateNodesTargetGroup())
select {
case ch <- all:
case <-ctx.Done():
return
} }
retryInterval := time.Duration(kd.Conf.RetryInterval) retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10) update := make(chan interface{}, 10)
go kd.watchNodes(update, done, retryInterval) go kd.watchNodes(update, ctx.Done(), retryInterval)
go kd.startServiceWatch(update, done, retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval)
var tg *config.TargetGroup var tg *config.TargetGroup
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case event := <-update: case event := <-update:
switch obj := event.(type) { switch obj := event.(type) {
@ -181,8 +142,8 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
} }
select { select {
case ch <- *tg: case ch <- []*config.TargetGroup{tg}:
case <-done: case <-ctx.Done():
return return
} }
} }

View File

@ -17,6 +17,8 @@ import (
"time" "time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon" "github.com/prometheus/prometheus/retrieval/discovery/marathon"
) )
@ -40,25 +42,13 @@ func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
} }
} }
// Sources implements the TargetProvider interface.
func (md *MarathonDiscovery) Sources() []string {
var sources []string
tgroups, err := md.fetchTargetGroups()
if err == nil {
for source := range tgroups {
sources = append(sources, source)
}
}
return sources
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case <-time.After(md.refreshInterval): case <-time.After(md.refreshInterval):
err := md.updateServices(ch) err := md.updateServices(ch)
@ -69,23 +59,24 @@ func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struc
} }
} }
func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups() targetMap, err := md.fetchTargetGroups()
if err != nil { if err != nil {
return err return err
} }
// Update services which are still present all := make([]*config.TargetGroup, 0, len(targetMap))
for _, tg := range targetMap { for _, tg := range targetMap {
ch <- *tg all = append(all, tg)
} }
ch <- all
// Remove services which did disappear // Remove services which did disappear
for source := range md.lastRefresh { for source := range md.lastRefresh {
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
log.Debugf("Removing group for %s", source) log.Debugf("Removing group for %s", source)
ch <- config.TargetGroup{Source: source} ch <- []*config.TargetGroup{{Source: source}}
} }
} }

View File

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon" "github.com/prometheus/prometheus/retrieval/discovery/marathon"
@ -26,8 +27,8 @@ import (
var marathonValidLabel = map[string]string{"prometheus": "yes"} var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) {
ch := make(chan config.TargetGroup) ch := make(chan []*config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{ md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"}, Servers: []string{"http://localhost:8080"},
}) })
@ -60,7 +61,9 @@ func TestMarathonSDEmptyList(t *testing.T) {
go func() { go func() {
select { select {
case tg := <-ch: case tg := <-ch:
t.Fatalf("Got group: %v", tg) if len(tg) > 0 {
t.Fatalf("Got group: %v", tg)
}
default: default:
} }
}() }()
@ -96,7 +99,9 @@ func TestMarathonSDSendGroup(t *testing.T) {
}) })
go func() { go func() {
select { select {
case tg := <-ch: case tgs := <-ch:
tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source) t.Fatalf("Wrong target group name: %s", tg.Source)
} }
@ -121,9 +126,10 @@ func TestMarathonSDRemoveApp(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
}) })
go func() { go func() {
up1 := <-ch up1 := (<-ch)[0]
up2 := <-ch up2 := (<-ch)[0]
if up2.Source != up1.Source { if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2) t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 { if len(up2.Targets) > 0 {
@ -145,33 +151,25 @@ func TestMarathonSDRemoveApp(t *testing.T) {
} }
} }
func TestMarathonSDSources(t *testing.T) {
_, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
sources := md.Sources()
if len(sources) != 1 {
t.Fatalf("Wrong number of sources: %s", sources)
}
}
func TestMarathonSDRunAndStop(t *testing.T) { func TestMarathonSDRunAndStop(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
}) })
md.refreshInterval = time.Millisecond * 10 md.refreshInterval = time.Millisecond * 10
done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
select { select {
case <-ch: case <-ch:
close(done) cancel()
case <-time.After(md.refreshInterval * 3): case <-time.After(md.refreshInterval * 3):
close(done) cancel()
t.Fatalf("Update took too long.") t.Fatalf("Update took too long.")
} }
}() }()
md.Run(ch, done)
md.Run(ctx, ch)
select { select {
case <-ch: case <-ch:
default: default:

View File

@ -21,6 +21,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/treecache" "github.com/prometheus/prometheus/util/treecache"
@ -47,7 +48,7 @@ type NerveDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- config.TargetGroup sdUpdates *chan<- []*config.TargetGroup
updates chan treecache.ZookeeperTreeCacheEvent updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache treeCaches []*treecache.ZookeeperTreeCache
} }
@ -73,17 +74,6 @@ func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery {
return sd return sd
} }
// Sources implements the TargetProvider interface.
func (sd *NerveDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *NerveDiscovery) processUpdates() { func (sd *NerveDiscovery) processUpdates() {
defer sd.conn.Close() defer sd.conn.Close()
for event := range sd.updates { for event := range sd.updates {
@ -104,7 +94,7 @@ func (sd *NerveDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- *tg *sd.sdUpdates <- []*config.TargetGroup{tg}
} }
} }
@ -114,17 +104,22 @@ func (sd *NerveDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *NerveDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *NerveDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- *targetGroup all := make([]*config.TargetGroup, 0, len(sd.sources))
for _, tg := range sd.sources {
all = append(all, tg)
} }
ch <- all
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch
sd.mu.Unlock() sd.mu.Unlock()
<-done <-ctx.Done()
for _, tc := range sd.treeCaches { for _, tc := range sd.treeCaches {
tc.Stop() tc.Stop()
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
@ -57,7 +58,7 @@ type ServersetDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- config.TargetGroup sdUpdates *chan<- []*config.TargetGroup
updates chan treecache.ZookeeperTreeCacheEvent updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache treeCaches []*treecache.ZookeeperTreeCache
} }
@ -83,17 +84,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
return sd return sd
} }
// Sources implements the TargetProvider interface.
func (sd *ServersetDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *ServersetDiscovery) processUpdates() { func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close() defer sd.conn.Close()
for event := range sd.updates { for event := range sd.updates {
@ -114,7 +104,7 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- *tg *sd.sdUpdates <- []*config.TargetGroup{tg}
} }
} }
@ -124,17 +114,22 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *ServersetDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- *targetGroup all := make([]*config.TargetGroup, 0, len(sd.sources))
for _, tg := range sd.sources {
all = append(all, tg)
} }
ch <- all
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch
sd.mu.Unlock() sd.mu.Unlock()
<-done <-ctx.Done()
for _, tc := range sd.treeCaches { for _, tc := range sd.treeCaches {
tc.Stop() tc.Stop()
} }
@ -142,8 +137,8 @@ func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan stru
func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { func parseServersetMember(data []byte, path string) (*model.LabelSet, error) {
member := serversetMember{} member := serversetMember{}
err := json.Unmarshal(data, &member)
if err != nil { if err := json.Unmarshal(data, &member); err != nil {
return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err) return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err)
} }

450
retrieval/scrape.go Normal file
View File

@ -0,0 +1,450 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
)
const (
scrapeHealthMetricName = "up"
scrapeDurationMetricName = "scrape_duration_seconds"
// Capacity of the channel to buffer samples during ingestion.
ingestedSamplesCap = 256
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var (
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appender storage.SampleAppender
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
targets map[uint64]*Target
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
}
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
client, err := newHTTPClient(cfg)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
return &scrapePool{
appender: app,
config: cfg,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
}
}
// stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() {
var wg sync.WaitGroup
sp.mtx.Lock()
defer sp.mtx.Unlock()
for fp, l := range sp.loops {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(l)
delete(sp.loops, fp)
delete(sp.targets, fp)
}
wg.Wait()
}
// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have fully terminated.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
client, err := newHTTPClient(cfg)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
sp.config = cfg
sp.client = client
var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for fp, oldLoop := range sp.loops {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
)
wg.Add(1)
go func(oldLoop, newLoop loop) {
oldLoop.stop()
wg.Done()
go newLoop.run(interval, timeout, nil)
}(oldLoop, newLoop)
sp.loops[fp] = newLoop
}
wg.Wait()
}
// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
var (
uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for _, t := range targets {
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
sp.targets[hash] = t
sp.loops[hash] = l
go l.run(interval, timeout, nil)
}
}
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops.
for hash := range sp.targets {
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])
delete(sp.loops, hash)
delete(sp.targets, hash)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}
// sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender {
app := sp.appender
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
}
}
if sp.config.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
}
return app
}
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: sp.appender,
labels: target.Labels(),
}
}
// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, ts time.Time) (model.Samples, error)
report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration) time.Duration
}
// targetScraper implements the scraper interface for a target.
type targetScraper struct {
*Target
client *http.Client
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", acceptHeader)
resp, err := ctxhttp.Do(ctx, s.client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
}
var (
allSamples = make(model.Samples, 0, 200)
decSamples = make(model.Vector, 0, 50)
)
sdec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
},
}
for {
if err = sdec.Decode(&decSamples); err != nil {
break
}
allSamples = append(allSamples, decSamples...)
decSamples = decSamples[:0]
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return allSamples, err
}
// A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
stop()
}
type scrapeLoop struct {
scraper scraper
appender storage.SampleAppender
reportAppender storage.SampleAppender
done chan struct{}
ctx context.Context
cancel func()
}
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop {
sl := &scrapeLoop{
scraper: sc,
appender: app,
reportAppender: reportApp,
done: make(chan struct{}),
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
return sl
}
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
defer close(sl.done)
select {
case <-time.After(sl.scraper.offset(interval)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
return
}
var last time.Time
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-sl.ctx.Done():
return
default:
}
if !sl.appender.NeedsThrottling() {
var (
start = time.Now()
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
)
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
float64(time.Since(last)) / float64(time.Second), // Sub-second precision.
)
}
samples, err := sl.scraper.scrape(scrapeCtx, start)
if err == nil {
sl.append(samples)
} else if errc != nil {
errc <- err
}
sl.report(start, time.Since(start), err)
last = start
} else {
targetSkippedScrapes.WithLabelValues(interval.String()).Inc()
}
select {
case <-sl.ctx.Done():
return
case <-ticker.C:
}
}
}
func (sl *scrapeLoop) stop() {
sl.cancel()
<-sl.done
}
func (sl *scrapeLoop) append(samples model.Samples) {
numOutOfOrder := 0
for _, s := range samples {
if err := sl.appender.Append(s); err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
log.Warnf("Error inserting sample: %s", err)
}
}
}
if numOutOfOrder > 0 {
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
}
}
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, err error) {
sl.scraper.report(start, duration, err)
ts := model.TimeFromUnixNano(start.UnixNano())
var health model.SampleValue
if err == nil {
health = 1
}
healthSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
},
Timestamp: ts,
Value: health,
}
durationSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
},
Timestamp: ts,
Value: model.SampleValue(float64(duration) / float64(time.Second)),
}
sl.reportAppender.Append(healthSample)
sl.reportAppender.Append(durationSample)
}

587
retrieval/scrape_test.go Normal file
View File

@ -0,0 +1,587 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
)
func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppender{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(cfg, app)
)
if a, ok := sp.appender.(*nopAppender); !ok || a != app {
t.Fatalf("Wrong sample appender")
}
if sp.config != cfg {
t.Fatalf("Wrong scrape config")
}
if sp.newLoop == nil {
t.Fatalf("newLoop function not initialized")
}
}
type testLoop struct {
startFunc func(interval, timeout time.Duration, errc chan<- error)
stopFunc func()
}
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
l.startFunc(interval, timeout, errc)
}
func (l *testLoop) stop() {
l.stopFunc()
}
func TestScrapePoolStop(t *testing.T) {
sp := &scrapePool{
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
}
var mtx sync.Mutex
stopped := map[uint64]bool{}
numTargets := 20
// Stopping the scrape pool must call stop() on all scrape loops,
// clean them and the respective targets up. It must wait until each loop's
// stop function returned before returning itself.
for i := 0; i < numTargets; i++ {
t := &Target{
labels: model.LabelSet{
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
},
}
l := &testLoop{}
l.stopFunc = func() {
time.Sleep(time.Duration(i*20) * time.Millisecond)
mtx.Lock()
stopped[t.hash()] = true
mtx.Unlock()
}
sp.targets[t.hash()] = t
sp.loops[t.hash()] = l
}
done := make(chan struct{})
stopTime := time.Now()
go func() {
sp.stop()
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("scrapeLoop.stop() did not return as expected")
case <-done:
// This should have taken at least as long as the last target slept.
if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
}
}
mtx.Lock()
if len(stopped) != numTargets {
t.Fatalf("Expected 20 stopped loops, got %d", len(stopped))
}
mtx.Unlock()
if len(sp.targets) > 0 {
t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets))
}
if len(sp.loops) > 0 {
t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops))
}
}
func TestScrapePoolReload(t *testing.T) {
var mtx sync.Mutex
numTargets := 20
stopped := map[uint64]bool{}
reloadCfg := &config.ScrapeConfig{
ScrapeInterval: model.Duration(3 * time.Second),
ScrapeTimeout: model.Duration(2 * time.Second),
}
// On starting to run, new loops created on reload check whether their preceeding
// equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop {
l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second {
t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval)
}
if timeout != 2*time.Second {
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
}
mtx.Lock()
if !stopped[s.(*targetScraper).hash()] {
t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper))
}
mtx.Unlock()
}
return l
}
sp := &scrapePool{
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
// loops and start new ones. A new loop must not be started before the preceeding
// one terminated.
for i := 0; i < numTargets; i++ {
t := &Target{
labels: model.LabelSet{
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
},
}
l := &testLoop{}
l.stopFunc = func() {
time.Sleep(time.Duration(i*20) * time.Millisecond)
mtx.Lock()
stopped[t.hash()] = true
mtx.Unlock()
}
sp.targets[t.hash()] = t
sp.loops[t.hash()] = l
}
done := make(chan struct{})
beforeTargets := map[uint64]*Target{}
for h, t := range sp.targets {
beforeTargets[h] = t
}
reloadTime := time.Now()
go func() {
sp.reload(reloadCfg)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("scrapeLoop.reload() did not return as expected")
case <-done:
// This should have taken at least as long as the last target slept.
if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
}
}
mtx.Lock()
if len(stopped) != numTargets {
t.Fatalf("Expected 20 stopped loops, got %d", stopped)
}
mtx.Unlock()
if !reflect.DeepEqual(sp.targets, beforeTargets) {
t.Fatalf("Reloading affected target states unexpectedly")
}
if len(sp.loops) != numTargets {
t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops))
}
}
func TestScrapePoolReportAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{}
sp := newScrapePool(cfg, app)
cfg.HonorLabels = false
wrapped := sp.reportAppender(target)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if rl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
}
cfg.HonorLabels = true
wrapped = sp.reportAppender(target)
hl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if hl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
}
}
func TestScrapePoolSampleAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{}
sp := newScrapePool(cfg, app)
cfg.HonorLabels = false
wrapped := sp.sampleAppender(target)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
re, ok := rl.SampleAppender.(relabelAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
}
if re.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
}
cfg.HonorLabels = true
wrapped = sp.sampleAppender(target)
hl, ok := wrapped.(honorLabelsAppender)
if !ok {
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
}
re, ok = hl.SampleAppender.(relabelAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
}
if re.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
}
}
func TestScrapeLoopStop(t *testing.T) {
scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped
// again before having started properly.
// Stopping not-yet-started loops must block until the run method was called and exited.
// The run method must exit immediately.
stopDone := make(chan struct{})
go func() {
sl.stop()
close(stopDone)
}()
select {
case <-stopDone:
t.Fatalf("Stopping terminated before run exited successfully")
case <-time.After(500 * time.Millisecond):
}
// Running the scrape loop must exit before calling the scraper even once.
scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) {
t.Fatalf("scraper was called for terminated scrape loop")
return nil, nil
}
runDone := make(chan struct{})
go func() {
sl.run(0, 0, nil)
close(runDone)
}()
select {
case <-runDone:
case <-time.After(1 * time.Second):
t.Fatalf("Running terminated scrape loop did not exit")
}
select {
case <-stopDone:
case <-time.After(1 * time.Second):
t.Fatalf("Stopping did not terminate after running exited")
}
}
func TestScrapeLoopRun(t *testing.T) {
var (
signal = make(chan struct{})
errc = make(chan error)
scraper = &testScraper{}
app = &nopAppender{}
reportApp = &nopAppender{}
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp)
// The loop must terminate during the initial offset if the context
// is canceled.
scraper.offsetDur = time.Hour
go func() {
sl.run(time.Second, time.Hour, errc)
signal <- struct{}{}
}()
// Wait to make sure we are actually waiting on the offset.
time.Sleep(1 * time.Second)
cancel()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Cancelation during initial offset failed")
case err := <-errc:
t.Fatalf("Unexpected error: %s", err)
}
// The provided timeout must cause cancelation of the context passed down to the
// scraper. The scraper has to respect the context.
scraper.offsetDur = 0
block := make(chan struct{})
scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) {
select {
case <-block:
case <-ctx.Done():
return nil, ctx.Err()
}
return nil, nil
}
ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp)
go func() {
sl.run(time.Second, 100*time.Millisecond, errc)
signal <- struct{}{}
}()
select {
case err := <-errc:
if err != context.DeadlineExceeded {
t.Fatalf("Expected timeout error but got: %s", err)
}
case <-time.After(3 * time.Second):
t.Fatalf("Expected timeout error but got none")
}
// We already caught the timeout error and are certainly in the loop.
// Let the scrapes returns immediately to cause no further timeout errors
// and check whether canceling the parent context terminates the loop.
close(block)
cancel()
select {
case <-signal:
// Loop terminated as expected.
case err := <-errc:
t.Fatalf("Unexpected error: %s", err)
case <-time.After(3 * time.Second):
t.Fatalf("Loop did not terminate on context cancelation")
}
}
func TestTargetScraperScrapeOK(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("metric_a 1\nmetric_b 2\n"))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
panic(err)
}
ts := &targetScraper{
Target: &Target{
labels: model.LabelSet{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
},
},
client: http.DefaultClient,
}
now := time.Now()
samples, err := ts.scrape(context.Background(), now)
if err != nil {
t.Fatalf("Unexpected scrape error: %s", err)
}
expectedSamples := model.Samples{
{
Metric: model.Metric{"__name__": "metric_a"},
Timestamp: model.TimeFromUnixNano(now.UnixNano()),
Value: 1,
},
{
Metric: model.Metric{"__name__": "metric_b"},
Timestamp: model.TimeFromUnixNano(now.UnixNano()),
Value: 2,
},
}
if !reflect.DeepEqual(samples, expectedSamples) {
t.Errorf("Scraped samples did not match served metrics")
t.Errorf("Expected: %v", expectedSamples)
t.Fatalf("Got: %v", samples)
}
}
func TestTargetScrapeScrapeCancel(t *testing.T) {
block := make(chan struct{})
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-block
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
panic(err)
}
ts := &targetScraper{
Target: &Target{
labels: model.LabelSet{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
},
},
client: http.DefaultClient,
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
go func() {
if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled {
t.Fatalf("Expected context cancelation error but got: %s", err)
}
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("Scrape function did not return unexpectedly")
case <-done:
}
// If this is closed in a defer above the function the test server
// does not terminate and the test doens't complete.
close(block)
}
func TestTargetScrapeScrapeNotFound(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
panic(err)
}
ts := &targetScraper{
Target: &Target{
labels: model.LabelSet{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
},
},
client: http.DefaultClient,
}
if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") {
t.Fatalf("Expected \"404 NotFound\" error but got: %s", err)
}
}
// testScraper implements the scraper interface and allows setting values
// returned by its methods. It also allows setting a custom scrape function.
type testScraper struct {
offsetDur time.Duration
lastStart time.Time
lastDuration time.Duration
lastError error
samples model.Samples
scrapeErr error
scrapeFunc func(context.Context, time.Time) (model.Samples, error)
}
func (ts *testScraper) offset(interval time.Duration) time.Duration {
return ts.offsetDur
}
func (ts *testScraper) report(start time.Time, duration time.Duration, err error) {
ts.lastStart = start
ts.lastDuration = duration
ts.lastError = err
}
func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) {
if ts.scrapeFunc != nil {
return ts.scrapeFunc(ctx, t)
}
return ts.samples, ts.scrapeErr
}

View File

@ -14,9 +14,8 @@
package retrieval package retrieval
import ( import (
"errors"
"fmt" "fmt"
"io" "hash/fnv"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -24,200 +23,46 @@ import (
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
const (
scrapeHealthMetricName = "up"
scrapeDurationMetricName = "scrape_duration_seconds"
// Capacity of the channel to buffer samples during ingestion.
ingestedSamplesCap = 256
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var (
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}
// TargetHealth describes the health state of a target. // TargetHealth describes the health state of a target.
type TargetHealth int type TargetHealth string
func (t TargetHealth) String() string {
switch t {
case HealthUnknown:
return "unknown"
case HealthGood:
return "up"
case HealthBad:
return "down"
}
panic("unknown state")
}
func (t TargetHealth) value() model.SampleValue {
if t == HealthGood {
return 1
}
return 0
}
// The possible health states of a target based on the last performed scrape.
const ( const (
// HealthUnknown is the state of a Target before it is first scraped. HealthUnknown TargetHealth = "unknown"
HealthUnknown TargetHealth = iota HealthGood TargetHealth = "up"
// HealthGood is the state of a Target that has been successfully scraped. HealthBad TargetHealth = "down"
HealthGood
// HealthBad is the state of a Target that was scraped unsuccessfully.
HealthBad
) )
// TargetStatus contains information about the current status of a scrape target.
type TargetStatus struct {
lastError error
lastScrape time.Time
health TargetHealth
mu sync.RWMutex
}
// LastError returns the error encountered during the last scrape.
func (ts *TargetStatus) LastError() error {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastError
}
// LastScrape returns the time of the last scrape.
func (ts *TargetStatus) LastScrape() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastScrape
}
// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.health
}
func (ts *TargetStatus) setLastScrape(t time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.lastScrape = t
}
func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if err == nil {
ts.health = HealthGood
} else {
ts.health = HealthBad
}
ts.lastError = err
}
// Target refers to a singular HTTP or HTTPS endpoint. // Target refers to a singular HTTP or HTTPS endpoint.
type Target struct { type Target struct {
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Mutex protects the members below.
sync.RWMutex
scrapeConfig *config.ScrapeConfig
// Labels before any processing. // Labels before any processing.
metaLabels model.LabelSet metaLabels model.LabelSet
// Any labels that are added to this target and its metrics. // Any labels that are added to this target and its metrics.
labels model.LabelSet labels model.LabelSet
// Additional URL parmeters that are part of the target URL.
params url.Values
// The HTTP client used to scrape the target's endpoint. mtx sync.RWMutex
httpClient *http.Client lastError error
lastScrape time.Time
health TargetHealth
} }
// NewTarget creates a reasonably configured target for querying. // NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) { func NewTarget(labels, metaLabels model.LabelSet, params url.Values) *Target {
t := &Target{ return &Target{
status: &TargetStatus{}, labels: labels,
scraperStopping: make(chan struct{}), metaLabels: metaLabels,
scraperStopped: make(chan struct{}), params: params,
health: HealthUnknown,
} }
err := t.Update(cfg, labels, metaLabels)
return t, err
}
// Status returns the status of the target.
func (t *Target) Status() *TargetStatus {
return t.status
}
// Update overwrites settings in the target that are derived from the job config
// it belongs to.
func (t *Target) Update(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) error {
t.Lock()
t.scrapeConfig = cfg
t.labels = labels
t.metaLabels = metaLabels
t.Unlock()
httpClient, err := t.client()
if err != nil {
return fmt.Errorf("cannot create HTTP client: %s", err)
}
t.Lock()
t.httpClient = httpClient
t.Unlock()
return nil
} }
func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
@ -265,15 +110,16 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
} }
func (t *Target) String() string { func (t *Target) String() string {
return t.host() return t.URL().String()
} }
// fingerprint returns an identifying hash for the target. // hash returns an identifying hash for the target.
func (t *Target) fingerprint() model.Fingerprint { func (t *Target) hash() uint64 {
t.RLock() h := fnv.New64a()
defer t.RUnlock() h.Write([]byte(t.labels.Fingerprint().String()))
h.Write([]byte(t.URL().String()))
return t.labels.Fingerprint() return h.Sum64()
} }
// offset returns the time until the next scrape cycle for the target. // offset returns the time until the next scrape cycle for the target.
@ -282,7 +128,7 @@ func (t *Target) offset(interval time.Duration) time.Duration {
var ( var (
base = now % int64(interval) base = now % int64(interval)
offset = uint64(t.fingerprint()) % uint64(interval) offset = t.hash() % uint64(interval)
next = base + int64(offset) next = base + int64(offset)
) )
@ -292,92 +138,27 @@ func (t *Target) offset(interval time.Duration) time.Duration {
return time.Duration(next) return time.Duration(next)
} }
func (t *Target) client() (*http.Client, error) { // Labels returns a copy of the set of all public labels of the target.
t.RLock() func (t *Target) Labels() model.LabelSet {
defer t.RUnlock() lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
return newHTTPClient(t.scrapeConfig) if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
} lset[ln] = lv
func (t *Target) interval() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeInterval)
}
func (t *Target) timeout() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeTimeout)
}
func (t *Target) scheme() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.SchemeLabel])
}
func (t *Target) host() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.AddressLabel])
}
func (t *Target) path() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.MetricsPathLabel])
}
// wrapAppender wraps a SampleAppender for samples ingested from the target.
// RLock must be acquired by the caller.
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
} }
} }
return lset
if t.scrapeConfig.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
}
}
return app
} }
// wrapReportingAppender wraps an appender for target status report samples. // MetaLabels returns a copy of the target's labels before any processing.
// It ignores any relabeling rules set for the target. func (t *Target) MetaLabels() model.LabelSet {
// RLock must not be acquired by the caller. return t.metaLabels.Clone()
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: app,
labels: t.Labels(),
}
} }
// URL returns a copy of the target's URL. // URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL { func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()
params := url.Values{} params := url.Values{}
for k, v := range t.scrapeConfig.Params { for k, v := range t.params {
params[k] = make([]string, len(v)) params[k] = make([]string, len(v))
copy(params[k], v) copy(params[k], v)
} }
@ -402,191 +183,51 @@ func (t *Target) URL() *url.URL {
} }
} }
// InstanceIdentifier returns the identifier for the target. func (t *Target) report(start time.Time, dur time.Duration, err error) {
func (t *Target) InstanceIdentifier() string { t.mtx.Lock()
return t.host() defer t.mtx.Unlock()
}
// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)
lastScrapeInterval := t.interval()
log.Debugf("Starting scraper for target %v...", t)
select {
case <-time.After(t.offset(lastScrapeInterval)):
// Continue after scraping offset.
case <-t.scraperStopping:
return
}
ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop()
t.scrape(sampleAppender)
// Explanation of the contraption below:
//
// In case t.scraperStopping has something to receive, we want to read
// from that channel rather than starting a new scrape (which might take very
// long). That's why the outer select has no ticker.C. Should t.scraperStopping
// not have anything to receive, we go into the inner select, where ticker.C
// is in the mix.
for {
select {
case <-t.scraperStopping:
return
default:
select {
case <-t.scraperStopping:
return
case <-ticker.C:
took := time.Since(t.status.LastScrape())
intervalStr := lastScrapeInterval.String()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if iv := t.interval(); iv != lastScrapeInterval {
ticker.Stop()
ticker = time.NewTicker(iv)
lastScrapeInterval = iv
}
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
if sampleAppender.NeedsThrottling() {
targetSkippedScrapes.WithLabelValues(intervalStr).Inc()
t.status.setLastError(errSkippedScrape)
continue
}
t.scrape(sampleAppender)
}
}
}
}
// StopScraper implements Target.
func (t *Target) StopScraper() {
log.Debugf("Stopping scraper for target %v...", t)
close(t.scraperStopping)
<-t.scraperStopped
log.Debugf("Scraper for target %v stopped.", t)
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *Target) scrape(appender storage.SampleAppender) error {
var (
err error
start = time.Now()
)
defer func(appender storage.SampleAppender) {
t.report(appender, start, time.Since(start), err)
}(appender)
t.RLock()
appender = t.wrapAppender(appender)
client := t.httpClient
t.RUnlock()
req, err := http.NewRequest("GET", t.URL().String(), nil)
if err != nil {
return err
}
req.Header.Add("Accept", acceptHeader)
ctx, _ := context.WithTimeout(context.Background(), t.timeout())
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned HTTP status %s", resp.Status)
}
dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header))
sdec := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(start.UnixNano()),
},
}
var (
samples model.Vector
numOutOfOrder int
logger = log.With("target", t.InstanceIdentifier())
)
for {
if err = sdec.Decode(&samples); err != nil {
break
}
for _, s := range samples {
err := appender.Append(s)
if err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
logger.With("sample", s).Warnf("Error inserting sample: %s", err)
}
}
}
}
if numOutOfOrder > 0 {
logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return err
}
func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) {
t.status.setLastScrape(start)
t.status.setLastError(err)
ts := model.TimeFromUnixNano(start.UnixNano())
var health model.SampleValue
if err == nil { if err == nil {
health = 1 t.health = HealthGood
} else {
t.health = HealthBad
} }
healthSample := &model.Sample{ t.lastError = err
Metric: model.Metric{ t.lastScrape = start
model.MetricNameLabel: scrapeHealthMetricName,
},
Timestamp: ts,
Value: health,
}
durationSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
},
Timestamp: ts,
Value: model.SampleValue(float64(duration) / float64(time.Second)),
}
app = t.wrapReportingAppender(app)
app.Append(healthSample)
app.Append(durationSample)
} }
// LastError returns the error encountered during the last scrape.
func (t *Target) LastError() error {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastError
}
// LastScrape returns the time of the last scrape.
func (t *Target) LastScrape() time.Time {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastScrape
}
// Health returns the last known health state of the target.
func (t *Target) Health() TargetHealth {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.health
}
// Targets is a sortable list of targets.
type Targets []*Target
func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
// Merges the ingested sample's metric with the label set. On a collision the // Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'. // value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct { type ruleLabelsAppender struct {
@ -643,36 +284,3 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() model.LabelSet {
t.RLock()
defer t.RUnlock()
return t.unlockedLabels()
}
// unlockedLabels does the same as Labels but does not lock the mutex (useful
// for internal usage when the mutex is already locked).
func (t *Target) unlockedLabels() model.LabelSet {
lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
lset[ln] = lv
}
}
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
return lset
}
// MetaLabels returns a copy of the target's labels before any processing.
func (t *Target) MetaLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
return t.metaLabels.Clone()
}

View File

@ -16,7 +16,6 @@ package retrieval
import ( import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -35,9 +34,8 @@ import (
func TestTargetLabels(t *testing.T) { func TestTargetLabels(t *testing.T) {
target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"}) target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"})
want := model.LabelSet{ want := model.LabelSet{
model.JobLabel: "some_job", model.JobLabel: "some_job",
model.InstanceLabel: "example.com:80", "foo": "bar",
"foo": "bar",
} }
got := target.Labels() got := target.Labels()
if !reflect.DeepEqual(want, got) { if !reflect.DeepEqual(want, got) {
@ -91,484 +89,36 @@ func TestTargetOffset(t *testing.T) {
} }
} }
func TestTargetWrapReportingAppender(t *testing.T) { func TestTargetURL(t *testing.T) {
cfg := &config.ScrapeConfig{ params := url.Values{
MetricRelabelConfigs: []*config.RelabelConfig{ "abc": []string{"foo", "bar", "baz"},
{}, {}, {}, "xyz": []string{"hoo"},
}, }
labels := model.LabelSet{
model.AddressLabel: "example.com:1234",
model.SchemeLabel: "https",
model.MetricsPathLabel: "/metricz",
"__param_abc": "overwrite",
"__param_cde": "huu",
}
target := NewTarget(labels, labels, params)
// The reserved labels are concatenated into a full URL. The first value for each
// URL query parameter can be set/modified via labels as well.
expectedParams := url.Values{
"abc": []string{"overwrite", "bar", "baz"},
"cde": []string{"huu"},
"xyz": []string{"hoo"},
}
expectedURL := url.URL{
Scheme: "https",
Host: "example.com:1234",
Path: "/metricz",
RawQuery: expectedParams.Encode(),
} }
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) if u := target.URL(); !reflect.DeepEqual(u.String(), expectedURL.String()) {
target.scrapeConfig = cfg t.Fatalf("Expected URL %q but got %q", expectedURL, u)
app := &nopAppender{}
cfg.HonorLabels = false
wrapped := target.wrapReportingAppender(app)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if rl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
}
cfg.HonorLabels = true
wrapped = target.wrapReportingAppender(app)
hl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if hl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
}
}
func TestTargetWrapAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
target.scrapeConfig = cfg
app := &nopAppender{}
cfg.HonorLabels = false
wrapped := target.wrapAppender(app)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
re, ok := rl.SampleAppender.(relabelAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
}
if re.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
}
cfg.HonorLabels = true
wrapped = target.wrapAppender(app)
hl, ok := wrapped.(honorLabelsAppender)
if !ok {
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
}
re, ok = hl.SampleAppender.(relabelAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
}
if re.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
}
}
func TestOverwriteLabels(t *testing.T) {
type test struct {
metric string
resultNormal model.Metric
resultHonor model.Metric
}
var tests []test
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
for _, test := range tests {
w.Write([]byte(test.metric))
w.Write([]byte(" 1\n"))
}
},
),
)
defer server.Close()
addr := model.LabelValue(strings.Split(server.URL, "://")[1])
tests = []test{
{
metric: `foo{}`,
resultNormal: model.Metric{
model.MetricNameLabel: "foo",
model.InstanceLabel: addr,
},
resultHonor: model.Metric{
model.MetricNameLabel: "foo",
model.InstanceLabel: addr,
},
},
{
metric: `foo{instance=""}`,
resultNormal: model.Metric{
model.MetricNameLabel: "foo",
model.InstanceLabel: addr,
},
resultHonor: model.Metric{
model.MetricNameLabel: "foo",
},
},
{
metric: `foo{instance="other_instance"}`,
resultNormal: model.Metric{
model.MetricNameLabel: "foo",
model.InstanceLabel: addr,
model.ExportedLabelPrefix + model.InstanceLabel: "other_instance",
},
resultHonor: model.Metric{
model.MetricNameLabel: "foo",
model.InstanceLabel: "other_instance",
},
},
}
target := newTestTarget(server.URL, time.Second, nil)
target.scrapeConfig.HonorLabels = false
app := &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
}
for i, test := range tests {
if !reflect.DeepEqual(app.result[i].Metric, test.resultNormal) {
t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultNormal, app.result[i].Metric)
}
}
target.scrapeConfig.HonorLabels = true
app = &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
}
for i, test := range tests {
if !reflect.DeepEqual(app.result[i].Metric, test.resultHonor) {
t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultHonor, app.result[i].Metric)
}
}
}
func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := newTestTarget("bad schema", 0, nil)
testTarget.scrape(nopAppender{})
if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
}
func TestTargetScrapeWithThrottledStorage(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
for i := 0; i < 10; i++ {
w.Write([]byte(
fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i),
))
}
},
),
)
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"})
go testTarget.RunScraper(&collectResultAppender{throttled: true})
// Enough time for a scrape to happen.
time.Sleep(20 * time.Millisecond)
testTarget.StopScraper()
// Wait for it to take effect.
time.Sleep(20 * time.Millisecond)
if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
if testTarget.status.LastError() != errSkippedScrape {
t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError())
}
}
func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric_drop 0\n"))
w.Write([]byte("test_metric_relabel 1\n"))
},
),
)
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{})
testTarget.scrapeConfig.MetricRelabelConfigs = []*config.RelabelConfig{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp(".*drop.*"),
Action: config.RelabelDrop,
},
{
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp(".*(relabel|up).*"),
TargetLabel: "foo",
Replacement: "bar",
Action: config.RelabelReplace,
},
}
appender := &collectResultAppender{}
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
// Remove variables part of result.
for _, sample := range appender.result {
sample.Timestamp = 0
sample.Value = 0
}
expected := []*model.Sample{
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric_relabel",
"foo": "bar",
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
},
// The metrics about the scrape are not affected.
{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
},
{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
},
}
if !appender.result.Equal(expected) {
t.Fatalf("Expected and actual samples not equal. Expected: %s, actual: %s", expected, appender.result)
}
}
func TestTargetRecordScrapeHealth(t *testing.T) {
var (
testTarget = newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"})
now = model.Now()
appender = &collectResultAppender{}
)
testTarget.report(appender, now.Time(), 2*time.Second, nil)
result := appender.result
if len(result) != 2 {
t.Fatalf("Expected two samples, got %d", len(result))
}
actual := result[0]
expected := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
model.InstanceLabel: "example.url:80",
model.JobLabel: "testjob",
},
Timestamp: now,
Value: 1,
}
if !actual.Equal(expected) {
t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual)
}
actual = result[1]
expected = &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
model.InstanceLabel: "example.url:80",
model.JobLabel: "testjob",
},
Timestamp: now,
Value: 2.0,
}
if !actual.Equal(expected) {
t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual)
}
}
func TestTargetScrapeTimeout(t *testing.T) {
signal := make(chan bool, 1)
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
<-signal
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte{})
},
),
)
defer server.Close()
testTarget := newTestTarget(server.URL, 50*time.Millisecond, model.LabelSet{})
appender := nopAppender{}
// scrape once without timeout
signal <- true
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
// let the deadline lapse
time.Sleep(55 * time.Millisecond)
// now scrape again
signal <- true
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.scrape(appender); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
}
// now scrape again without timeout
signal <- true
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
}
func TestTargetScrape404(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
},
),
)
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{})
appender := nopAppender{}
want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.scrape(appender)
if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got)
}
}
func TestTargetRunScraperScrapes(t *testing.T) {
testTarget := newTestTarget("bad schema", 0, nil)
go testTarget.RunScraper(nopAppender{})
// Enough time for a scrape to happen.
time.Sleep(20 * time.Millisecond)
if testTarget.status.LastScrape().IsZero() {
t.Errorf("Scrape hasn't occured.")
}
testTarget.StopScraper()
// Wait for it to take effect.
time.Sleep(20 * time.Millisecond)
last := testTarget.status.LastScrape()
// Enough time for a scrape to happen.
time.Sleep(20 * time.Millisecond)
if testTarget.status.LastScrape() != last {
t.Errorf("Scrape occured after it was stopped.")
}
}
func BenchmarkScrape(b *testing.B) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
},
),
)
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"})
appender := nopAppender{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := testTarget.scrape(appender); err != nil {
b.Fatal(err)
}
}
}
func TestURLParams(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte{})
r.ParseForm()
if r.Form["foo"][0] != "bar" {
t.Fatalf("URL parameter 'foo' had unexpected first value '%v'", r.Form["foo"][0])
}
if r.Form["foo"][1] != "baz" {
t.Fatalf("URL parameter 'foo' had unexpected second value '%v'", r.Form["foo"][1])
}
},
),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatal(err)
}
target, err := NewTarget(
&config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(1 * time.Second),
Scheme: serverURL.Scheme,
Params: url.Values{
"foo": []string{"bar", "baz"},
},
},
model.LabelSet{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
"__param_foo": "bar",
},
nil,
)
if err != nil {
t.Fatal(err)
}
app := &collectResultAppender{}
if err = target.scrape(app); err != nil {
t.Fatal(err)
} }
} }
@ -578,23 +128,9 @@ func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelS
labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://")) labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://"))
labels[model.MetricsPathLabel] = "/metrics" labels[model.MetricsPathLabel] = "/metrics"
t := &Target{ return &Target{
scrapeConfig: &config.ScrapeConfig{ labels: labels,
ScrapeInterval: model.Duration(time.Millisecond),
ScrapeTimeout: model.Duration(deadline),
},
labels: labels,
status: &TargetStatus{},
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
} }
var err error
if t.httpClient, err = t.client(); err != nil {
panic(err)
}
return t
} }
func TestNewHTTPBearerToken(t *testing.T) { func TestNewHTTPBearerToken(t *testing.T) {
@ -766,7 +302,7 @@ func newTLSConfig(t *testing.T) *tls.Config {
return tlsConfig return tlsConfig
} }
func TestNewTargetWithBadTLSConfig(t *testing.T) { func TestNewClientWithBadTLSConfig(t *testing.T) {
cfg := &config.ScrapeConfig{ cfg := &config.ScrapeConfig{
ScrapeTimeout: model.Duration(1 * time.Second), ScrapeTimeout: model.Duration(1 * time.Second),
TLSConfig: config.TLSConfig{ TLSConfig: config.TLSConfig{
@ -775,7 +311,7 @@ func TestNewTargetWithBadTLSConfig(t *testing.T) {
KeyFile: "testdata/nonexistent_client.key", KeyFile: "testdata/nonexistent_client.key",
}, },
} }
_, err := NewTarget(cfg, nil, nil) _, err := newHTTPClient(cfg)
if err == nil { if err == nil {
t.Fatalf("Expected error, got nil.") t.Fatalf("Expected error, got nil.")
} }

View File

@ -17,9 +17,11 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery" "github.com/prometheus/prometheus/retrieval/discovery"
@ -33,285 +35,102 @@ import (
// The TargetProvider does not have to guarantee that an actual change happened. // The TargetProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens. // It does guarantee that it sends the new TargetGroup whenever a change happens.
// //
// Sources() is guaranteed to be called exactly once before each call to Run(). // Providers must initially send all known target groups as soon as it can.
// On a call to Run() implementing types must send a valid target group for each of
// the sources they declared in the last call to Sources().
type TargetProvider interface { type TargetProvider interface {
// Sources returns the source identifiers the provider is currently aware of.
Sources() []string
// Run hands a channel to the target provider through which it can send // Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider // updated target groups. The channel must be closed by the target provider
// if no more updates will be sent. // if no more updates will be sent.
// On receiving from done Run must return. // On receiving from done Run must return.
Run(up chan<- config.TargetGroup, done <-chan struct{}) Run(ctx context.Context, up chan<- []*config.TargetGroup)
} }
// TargetManager maintains a set of targets, starts and stops their scraping and // TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various // creates the new targets based on the target groups it receives from various
// target providers. // target providers.
type TargetManager struct { type TargetManager struct {
mtx sync.RWMutex appender storage.SampleAppender
sampleAppender storage.SampleAppender scrapeConfigs []*config.ScrapeConfig
running bool
done chan struct{}
// Targets by their source ID. mtx sync.RWMutex
targets map[string][]*Target ctx context.Context
// Providers by the scrape configs they are derived from. cancel func()
providers map[*config.ScrapeConfig][]TargetProvider wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
} }
// NewTargetManager creates a new TargetManager. // NewTargetManager creates a new TargetManager.
func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { func NewTargetManager(app storage.SampleAppender) *TargetManager {
tm := &TargetManager{ return &TargetManager{
sampleAppender: sampleAppender, appender: app,
targets: map[string][]*Target{}, targetSets: map[string]*targetSet{},
} }
return tm
}
// merge multiple target group channels into a single output channel.
func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate {
var wg sync.WaitGroup
out := make(chan targetGroupUpdate)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
redir := func(c <-chan targetGroupUpdate) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go redir(c)
}
// Close the out channel if all inbound channels are closed.
go func() {
wg.Wait()
close(out)
}()
return out
}
// targetGroupUpdate is a potentially changed/new target group
// for the given scrape configuration.
type targetGroupUpdate struct {
tg config.TargetGroup
scfg *config.ScrapeConfig
} }
// Run starts background processing to handle target updates. // Run starts background processing to handle target updates.
func (tm *TargetManager) Run() { func (tm *TargetManager) Run() {
log.Info("Starting target manager...") log.Info("Starting target manager...")
tm.done = make(chan struct{})
sources := map[string]struct{}{}
updates := []<-chan targetGroupUpdate{}
for scfg, provs := range tm.providers {
for _, prov := range provs {
// Get an initial set of available sources so we don't remove
// target groups from the last run that are still available.
for _, src := range prov.Sources() {
sources[src] = struct{}{}
}
tgc := make(chan config.TargetGroup)
// Run the target provider after cleanup of the stale targets is done.
defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) {
go prov.Run(tgc, done)
}(prov, tgc, tm.done)
tgupc := make(chan targetGroupUpdate)
updates = append(updates, tgupc)
go func(scfg *config.ScrapeConfig, done <-chan struct{}) {
defer close(tgupc)
for {
select {
case tg := <-tgc:
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg}
case <-done:
return
}
}
}(scfg, tm.done)
}
}
// Merge all channels of incoming target group updates into a single
// one and keep applying the updates.
go tm.handleUpdates(merge(tm.done, updates...), tm.done)
tm.mtx.Lock() tm.mtx.Lock()
defer tm.mtx.Unlock()
// Remove old target groups that are no longer in the set of sources. tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.removeTargets(func(src string) bool { tm.reload()
if _, ok := sources[src]; ok {
return false
}
return true
})
tm.running = true tm.mtx.Unlock()
log.Info("Target manager started.")
}
// handleUpdates receives target group updates and handles them in the tm.wg.Wait()
// context of the given job config.
func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) {
for {
select {
case update, ok := <-ch:
if !ok {
return
}
log.Debugf("Received potential update for target group %q", update.tg.Source)
if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil {
log.Errorf("Error updating targets: %s", err)
}
case <-done:
return
}
}
} }
// Stop all background processing. // Stop all background processing.
func (tm *TargetManager) Stop() { func (tm *TargetManager) Stop() {
tm.mtx.RLock() log.Infoln("Stopping target manager...")
if tm.running {
defer tm.stop(true)
}
// Return the lock before calling tm.stop().
defer tm.mtx.RUnlock()
}
// stop background processing of the target manager. If removeTargets is true,
// existing targets will be stopped and removed.
func (tm *TargetManager) stop(removeTargets bool) {
log.Info("Stopping target manager...")
defer log.Info("Target manager stopped.")
close(tm.done)
tm.mtx.Lock() tm.mtx.Lock()
defer tm.mtx.Unlock() // Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
if removeTargets { // Wait for all scrape inserts to complete.
tm.removeTargets(nil) tm.wg.Wait()
}
tm.running = false log.Debugln("Target manager stopped")
} }
// removeTargets stops and removes targets for sources where f(source) is true func (tm *TargetManager) reload() {
// or if f is nil. This method is not thread-safe. jobs := map[string]struct{}{}
func (tm *TargetManager) removeTargets(f func(string) bool) {
if f == nil {
f = func(string) bool { return true }
}
var wg sync.WaitGroup
for src, targets := range tm.targets {
if !f(src) {
continue
}
wg.Add(len(targets))
for _, target := range targets {
go func(t *Target) {
t.StopScraper()
wg.Done()
}(target)
}
delete(tm.targets, src)
}
wg.Wait()
}
// updateTargetGroup creates new targets for the group and replaces the old targets // Start new target sets and update existing ones.
// for the source ID. for _, scfg := range tm.scrapeConfigs {
func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *config.ScrapeConfig) error { jobs[scfg.JobName] = struct{}{}
newTargets, err := tm.targetsFromGroup(tgroup, cfg)
if err != nil { ts, ok := tm.targetSets[scfg.JobName]
return err if !ok {
ts = newTargetSet(scfg, tm.appender)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
ts.runScraping(tm.ctx)
tm.wg.Done()
}(ts)
} else {
ts.reload(scfg)
}
ts.runProviders(tm.ctx, providersFromConfig(scfg))
} }
tm.mtx.Lock() // Remove old target sets. Waiting for stopping is already guaranteed
defer tm.mtx.Unlock() // by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if !tm.running { if _, ok := jobs[name]; !ok {
return nil ts.cancel()
} delete(tm.targetSets, name)
oldTargets, ok := tm.targets[tgroup.Source]
if ok {
var wg sync.WaitGroup
// Replace the old targets with the new ones while keeping the state
// of intersecting targets.
for i, tnew := range newTargets {
var match *Target
for j, told := range oldTargets {
if told == nil {
continue
}
if tnew.fingerprint() == told.fingerprint() {
match = told
oldTargets[j] = nil
break
}
}
// Update the existing target and discard the new equivalent.
// Otherwise start scraping the new target.
if match != nil {
// Updating is blocked during a scrape. We don't want those wait times
// to build up.
wg.Add(1)
go func(t *Target) {
if err := match.Update(cfg, t.labels, t.metaLabels); err != nil {
log.Errorf("Error updating target %v: %v", t, err)
}
wg.Done()
}(tnew)
newTargets[i] = match
} else {
go tnew.RunScraper(tm.sampleAppender)
}
}
// Remove all old targets that disappeared.
for _, told := range oldTargets {
if told != nil {
wg.Add(1)
go func(t *Target) {
t.StopScraper()
wg.Done()
}(told)
}
}
wg.Wait()
} else {
// The source ID is new, start all target scrapers.
for _, tnew := range newTargets {
go tnew.RunScraper(tm.sampleAppender)
} }
} }
if len(newTargets) > 0 {
tm.targets[tgroup.Source] = newTargets
} else {
delete(tm.targets, tgroup.Source)
}
return nil
} }
// Pools returns the targets currently being scraped bucketed by their job name. // Pools returns the targets currently being scraped bucketed by their job name.
@ -321,11 +140,16 @@ func (tm *TargetManager) Pools() map[string][]*Target {
pools := map[string][]*Target{} pools := map[string][]*Target{}
for _, ts := range tm.targets { // TODO(fabxc): this is just a hack to maintain compatibility for now.
for _, t := range ts { for _, ps := range tm.targetSets {
ps.scrapePool.mtx.RLock()
for _, t := range ps.scrapePool.targets {
job := string(t.Labels()[model.JobLabel]) job := string(t.Labels()[model.JobLabel])
pools[job] = append(pools[job], t) pools[job] = append(pools[job], t)
} }
ps.scrapePool.mtx.RUnlock()
} }
return pools return pools
} }
@ -334,79 +158,196 @@ func (tm *TargetManager) Pools() map[string][]*Target {
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
// Returns true on success. // Returns true on success.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
tm.mtx.RLock()
running := tm.running
tm.mtx.RUnlock()
if running {
tm.stop(false)
// Even if updating the config failed, we want to continue rather than stop scraping anything.
defer tm.Run()
}
providers := map[*config.ScrapeConfig][]TargetProvider{}
for _, scfg := range cfg.ScrapeConfigs {
providers[scfg] = providersFromConfig(scfg)
}
tm.mtx.Lock() tm.mtx.Lock()
defer tm.mtx.Unlock() defer tm.mtx.Unlock()
tm.providers = providers tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return true return true
} }
// prefixedTargetProvider wraps TargetProvider and prefixes source strings // targetSet holds several TargetProviders for which the same scrape configuration
// to make the sources unique across a configuration. // is used. It maintains target groups from all given providers and sync them
type prefixedTargetProvider struct { // to a scrape pool.
TargetProvider type targetSet struct {
mtx sync.RWMutex
job string // Sets of targets by a source string that is unique across target providers.
mechanism string tgroups map[string][]*Target
idx int providers map[string]TargetProvider
scrapePool *scrapePool
config *config.ScrapeConfig
syncCh chan struct{}
cancelScraping func()
cancelProviders func()
} }
func (tp *prefixedTargetProvider) prefix(src string) string { func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) ts := &targetSet{
} tgroups: map[string][]*Target{},
scrapePool: newScrapePool(cfg, app),
func (tp *prefixedTargetProvider) Sources() []string { syncCh: make(chan struct{}, 1),
srcs := tp.TargetProvider.Sources() config: cfg,
for i, src := range srcs {
srcs[i] = tp.prefix(src)
} }
return ts
return srcs
} }
func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (ts *targetSet) cancel() {
defer close(ch) ts.mtx.RLock()
defer ts.mtx.RUnlock()
ch2 := make(chan config.TargetGroup) if ts.cancelScraping != nil {
go tp.TargetProvider.Run(ch2, done) ts.cancelScraping()
}
if ts.cancelProviders != nil {
ts.cancelProviders()
}
}
func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
ts.mtx.Lock()
ts.config = cfg
ts.mtx.Unlock()
ts.scrapePool.reload(cfg)
}
func (ts *targetSet) runScraping(ctx context.Context) {
ctx, ts.cancelScraping = context.WithCancel(ctx)
ts.scrapePool.ctx = ctx
Loop:
for { for {
// Throttle syncing to once per five seconds.
select { select {
case <-done: case <-ctx.Done():
return break Loop
case tg := <-ch2: case <-time.After(5 * time.Second):
tg.Source = tp.prefix(tg.Source) }
ch <- tg
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.mtx.RLock()
ts.sync()
ts.mtx.RUnlock()
} }
} }
// We want to wait for all pending target scrapes to complete though to ensure there'll
// be no more storage writes after this point.
ts.scrapePool.stop()
}
func (ts *targetSet) sync() {
var all []*Target
for _, targets := range ts.tgroups {
all = append(all, targets...)
}
ts.scrapePool.sync(all)
}
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
// is retrieved and applied.
// We could release earlier with some tweaks, but this is easier to reason about.
ts.mtx.Lock()
defer ts.mtx.Unlock()
var wg sync.WaitGroup
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go func(name string, prov TargetProvider) {
var initial []*config.TargetGroup
select {
case <-ctx.Done():
wg.Done()
return
case initial = <-updates:
// First set of all targets the provider knows.
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
for _, tgroup := range initial {
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
continue
}
ts.tgroups[name+"/"+tgroup.Source] = targets
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs := <-updates:
for _, tg := range tgs {
if err := ts.update(name, tg); err != nil {
log.With("target_group", tg).Errorf("Target update failed: %s", err)
}
}
}
}
}(name, prov)
go prov.Run(ctx, updates)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
ts.sync()
}
// update handles a target group update from a target provider identified by the name.
func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
return err
}
ts.mtx.Lock()
defer ts.mtx.Unlock()
ts.tgroups[name+"/"+tgroup.Source] = targets
select {
case ts.syncCh <- struct{}{}:
default:
}
return nil
} }
// providersFromConfig returns all TargetProviders configured in cfg. // providersFromConfig returns all TargetProviders configured in cfg.
func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
var providers []TargetProvider providers := map[string]TargetProvider{}
app := func(mech string, i int, tp TargetProvider) { app := func(mech string, i int, tp TargetProvider) {
providers = append(providers, &prefixedTargetProvider{ providers[fmt.Sprintf("%s/%d", mech, i)] = tp
job: cfg.JobName,
mechanism: mech,
idx: i,
TargetProvider: tp,
})
} }
for i, c := range cfg.DNSSDConfigs { for i, c := range cfg.DNSSDConfigs {
@ -451,11 +392,9 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
} }
// targetsFromGroup builds targets based on the given TargetGroup and config. // targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := make([]*Target, 0, len(tg.Targets)) targets := make([]*Target, 0, len(tg.Targets))
for i, labels := range tg.Targets { for i, labels := range tg.Targets {
for k, v := range cfg.Params { for k, v := range cfg.Params {
if len(v) > 0 { if len(v) > 0 {
@ -518,11 +457,12 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
delete(labels, ln) delete(labels, ln)
} }
} }
tr, err := NewTarget(cfg, labels, preRelabelLabels)
if err != nil { if _, ok := labels[model.InstanceLabel]; !ok {
return nil, fmt.Errorf("error while creating instance %d in target group %s: %s", i, tg, err) labels[model.InstanceLabel] = labels[model.AddressLabel]
} }
targets = append(targets, tr)
targets = append(targets, NewTarget(labels, preRelabelLabels, cfg.Params))
} }
return targets, nil return targets, nil
@ -539,29 +479,16 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups { for i, tg := range groups {
tg.Source = fmt.Sprintf("%d", i) tg.Source = fmt.Sprintf("%d", i)
} }
return &StaticProvider{ return &StaticProvider{groups}
TargetGroups: groups,
}
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) // We still have to consider that the consumer exits right away in which case
// the context will be canceled.
for _, tg := range sd.TargetGroups { select {
select { case ch <- sd.TargetGroups:
case <-done: case <-ctx.Done():
return
case ch <- *tg:
}
} }
<-done close(ch)
}
// Sources returns the provider's sources.
func (sd *StaticProvider) Sources() (srcs []string) {
for _, tg := range sd.TargetGroups {
srcs = append(srcs, tg.Source)
}
return srcs
} }

View File

@ -12,492 +12,3 @@
// limitations under the License. // limitations under the License.
package retrieval package retrieval
import (
"net/url"
"reflect"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
func TestPrefixedTargetProvider(t *testing.T) {
targetGroups := []*config.TargetGroup{
{
Targets: []model.LabelSet{
{model.AddressLabel: "test-1:1234"},
},
}, {
Targets: []model.LabelSet{
{model.AddressLabel: "test-1:1235"},
},
},
}
tp := &prefixedTargetProvider{
job: "job-x",
mechanism: "static",
idx: 123,
TargetProvider: NewStaticProvider(targetGroups),
}
expSources := []string{
"job-x:static:123:0",
"job-x:static:123:1",
}
if !reflect.DeepEqual(tp.Sources(), expSources) {
t.Fatalf("expected sources %v, got %v", expSources, tp.Sources())
}
ch := make(chan config.TargetGroup)
done := make(chan struct{})
defer close(done)
go tp.Run(ch, done)
expGroup1 := *targetGroups[0]
expGroup2 := *targetGroups[1]
expGroup1.Source = "job-x:static:123:0"
expGroup2.Source = "job-x:static:123:1"
// The static target provider sends on the channel once per target group.
if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) {
t.Fatalf("expected target group %v, got %v", expGroup1, tg)
}
if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) {
t.Fatalf("expected target group %v, got %v", expGroup2, tg)
}
}
func TestTargetManagerChan(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{{
Targets: []model.LabelSet{
{model.AddressLabel: "example.org:80"},
{model.AddressLabel: "example.com:80"},
},
}},
}
prov1 := &fakeTargetProvider{
sources: []string{"src1", "src2"},
update: make(chan *config.TargetGroup),
}
targetManager := &TargetManager{
sampleAppender: nopAppender{},
providers: map[*config.ScrapeConfig][]TargetProvider{
testJob1: {prov1},
},
targets: make(map[string][]*Target),
}
go targetManager.Run()
defer targetManager.Stop()
sequence := []struct {
tgroup *config.TargetGroup
expected map[string][]model.LabelSet
}{
{
tgroup: &config.TargetGroup{
Source: "src1",
Targets: []model.LabelSet{
{model.AddressLabel: "test-1:1234"},
{model.AddressLabel: "test-2:1234", "label": "set"},
{model.AddressLabel: "test-3:1234"},
},
},
expected: map[string][]model.LabelSet{
"src1": {
{model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src2",
Targets: []model.LabelSet{
{model.AddressLabel: "test-1:1235"},
{model.AddressLabel: "test-2:1235"},
{model.AddressLabel: "test-3:1235"},
},
Labels: model.LabelSet{"group": "label"},
},
expected: map[string][]model.LabelSet{
"src1": {
{model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"},
},
"src2": {
{model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1235", "group": "label"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1235", "group": "label"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1235", "group": "label"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src2",
Targets: []model.LabelSet{},
},
expected: map[string][]model.LabelSet{
"src1": {
{model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"},
},
},
}, {
tgroup: &config.TargetGroup{
Source: "src1",
Targets: []model.LabelSet{
{model.AddressLabel: "test-1:1234", "added": "label"},
{model.AddressLabel: "test-3:1234"},
{model.AddressLabel: "test-4:1234", "fancy": "label"},
},
},
expected: map[string][]model.LabelSet{
"src1": {
{model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234", "added": "label"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"},
{model.JobLabel: "test_job1", model.InstanceLabel: "test-4:1234", "fancy": "label"},
},
},
},
}
for i, step := range sequence {
prov1.update <- step.tgroup
time.Sleep(20 * time.Millisecond)
if len(targetManager.targets) != len(step.expected) {
t.Fatalf("step %d: sources mismatch %v, %v", i, targetManager.targets, step.expected)
}
for source, actTargets := range targetManager.targets {
expTargets, ok := step.expected[source]
if !ok {
t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets)
}
for _, expt := range expTargets {
found := false
for _, actt := range actTargets {
if reflect.DeepEqual(expt, actt.Labels()) {
found = true
break
}
}
if !found {
t.Errorf("step %d: expected target %v not found in actual targets", i, expt)
}
}
}
}
}
func TestTargetManagerConfigUpdate(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: model.Duration(1 * time.Minute),
Params: url.Values{
"testParam": []string{"paramValue", "secondValue"},
},
TargetGroups: []*config.TargetGroup{{
Targets: []model.LabelSet{
{model.AddressLabel: "example.org:80"},
{model.AddressLabel: "example.com"},
},
}},
RelabelConfigs: []*config.RelabelConfig{
{
// Copy out the URL parameter.
SourceLabels: model.LabelNames{"__param_testParam"},
Regex: config.MustNewRegexp("(.*)"),
TargetLabel: "testParam",
Replacement: "$1",
Action: config.RelabelReplace,
},
{
// The port number is added after relabeling, so
// this relabel rule should have no effect.
SourceLabels: model.LabelNames{model.AddressLabel},
Regex: config.MustNewRegexp("example.com:80"),
Action: config.RelabelDrop,
},
},
}
testJob2 := &config.ScrapeConfig{
JobName: "test_job2",
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{
{
Targets: []model.LabelSet{
{model.AddressLabel: "example.org:8080"},
{model.AddressLabel: "example.com:8081"},
},
Labels: model.LabelSet{
"foo": "bar",
"boom": "box",
},
},
{
Targets: []model.LabelSet{
{model.AddressLabel: "test.com:1234"},
},
},
{
Targets: []model.LabelSet{
{model.AddressLabel: "test.com:1235"},
},
Labels: model.LabelSet{"instance": "fixed"},
},
},
RelabelConfigs: []*config.RelabelConfig{
{
SourceLabels: model.LabelNames{model.AddressLabel},
Regex: config.MustNewRegexp(`test\.(.*?):(.*)`),
Replacement: "foo.${1}:${2}",
TargetLabel: model.AddressLabel,
Action: config.RelabelReplace,
},
{
// Add a new label for example.* targets.
SourceLabels: model.LabelNames{model.AddressLabel, "boom", "foo"},
Regex: config.MustNewRegexp("example.*?-b([a-z-]+)r"),
TargetLabel: "new",
Replacement: "$1",
Separator: "-",
Action: config.RelabelReplace,
},
{
// Drop an existing label.
SourceLabels: model.LabelNames{"boom"},
Regex: config.MustNewRegexp(".*"),
TargetLabel: "boom",
Replacement: "",
Action: config.RelabelReplace,
},
},
}
// Test that targets without host:port addresses are dropped.
testJob3 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{{
Targets: []model.LabelSet{
{model.AddressLabel: "example.net:80"},
},
}},
RelabelConfigs: []*config.RelabelConfig{
{
SourceLabels: model.LabelNames{model.AddressLabel},
Regex: config.MustNewRegexp("(.*)"),
TargetLabel: "__address__",
Replacement: "http://$1",
Action: config.RelabelReplace,
},
},
}
sequence := []struct {
scrapeConfigs []*config.ScrapeConfig
expected map[string][]model.LabelSet
}{
{
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]model.LabelSet{
"test_job1:static:0:0": {
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.org:80",
model.ParamLabelPrefix + "testParam": "paramValue",
},
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.com:80",
model.ParamLabelPrefix + "testParam": "paramValue"},
},
},
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]model.LabelSet{
"test_job1:static:0:0": {
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.org:80",
model.ParamLabelPrefix + "testParam": "paramValue",
},
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.com:80",
model.ParamLabelPrefix + "testParam": "paramValue",
},
},
},
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1, testJob2},
expected: map[string][]model.LabelSet{
"test_job1:static:0:0": {
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.org:80",
model.ParamLabelPrefix + "testParam": "paramValue",
},
{
model.JobLabel: "test_job1",
"testParam": "paramValue",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.com:80",
model.ParamLabelPrefix + "testParam": "paramValue",
},
},
"test_job2:static:0:0": {
{
model.JobLabel: "test_job2",
"foo": "bar",
"new": "ox-ba",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.org:8080",
},
{
model.JobLabel: "test_job2",
"foo": "bar",
"new": "ox-ba",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.com:8081",
},
},
"test_job2:static:0:1": {
{
model.JobLabel: "test_job2",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "foo.com:1234",
},
},
"test_job2:static:0:2": {
{
model.JobLabel: "test_job2",
model.InstanceLabel: "fixed",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "foo.com:1235",
},
},
},
}, {
scrapeConfigs: []*config.ScrapeConfig{},
expected: map[string][]model.LabelSet{},
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob2},
expected: map[string][]model.LabelSet{
"test_job2:static:0:0": {
{
model.JobLabel: "test_job2",
"foo": "bar",
"new": "ox-ba",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.org:8080"},
{
model.JobLabel: "test_job2",
"foo": "bar",
"new": "ox-ba",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "example.com:8081",
},
},
"test_job2:static:0:1": {
{
model.JobLabel: "test_job2",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "foo.com:1234",
},
},
"test_job2:static:0:2": {
{
model.JobLabel: "test_job2",
model.InstanceLabel: "fixed",
model.SchemeLabel: "",
model.MetricsPathLabel: "",
model.AddressLabel: "foo.com:1235",
},
},
},
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob3},
expected: map[string][]model.LabelSet{},
},
}
conf := &config.Config{}
*conf = config.DefaultConfig
targetManager := NewTargetManager(nopAppender{})
targetManager.ApplyConfig(conf)
targetManager.Run()
defer targetManager.Stop()
for i, step := range sequence {
conf.ScrapeConfigs = step.scrapeConfigs
targetManager.ApplyConfig(conf)
time.Sleep(50 * time.Millisecond)
if len(targetManager.targets) != len(step.expected) {
t.Fatalf("step %d: sources mismatch: expected %v, got %v", i, step.expected, targetManager.targets)
}
for source, actTargets := range targetManager.targets {
expTargets, ok := step.expected[source]
if !ok {
t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets)
}
for _, expt := range expTargets {
found := false
for _, actt := range actTargets {
if reflect.DeepEqual(expt, actt.labels) {
found = true
break
}
}
if !found {
t.Errorf("step %d: expected target %v for %q not found in actual targets", i, expt, source)
}
}
}
}
}
func TestHandleUpdatesReturnsWhenUpdateChanIsClosed(t *testing.T) {
tm := NewTargetManager(nopAppender{})
ch := make(chan targetGroupUpdate)
close(ch)
tm.handleUpdates(ch, make(chan struct{}))
}

View File

@ -159,7 +159,7 @@ func webUiTemplatesGraphHtml() (*asset, error) {
return a, nil return a, nil
} }
var _webUiTemplatesStatusHtml = []byte("\x1f\x8b\x08\x00\x00\x09\x6e\x88\x00\xff\xcc\x57\xcd\x6e\xdc\x36\x10\xbe\xef\x53\xb0\x44\x8e\xd5\x2e\x10\xa0\x17\x63\x57\x07\x1b\x29\x1c\xc0\x29\xdc\xac\x7d\xe9\x25\xe0\x8a\xb3\x12\x5b\x9a\x14\x48\xca\xb5\xa1\xe8\xdd\x3b\x43\x49\x5e\xfd\x6d\x9a\x34\x68\xeb\xcb\x9a\x43\x0e\xe7\xe7\x9b\x6f\x46\x74\x5d\x4b\x38\x2a\x03\x8c\x17\x20\x24\x6f\x9a\xed\x0f\x49\xc2\x8c\x7a\x62\x49\x92\xd6\x35\x18\xd9\x34\xab\xd5\x49\x2b\xb3\x26\x80\x09\xa8\xb8\x62\x6c\x2b\xd5\x23\xcb\xb4\xf0\x7e\x17\x0f\x04\xaa\xb8\xe4\xa8\x2b\x25\x79\x8a\xe7\xa8\x51\xbc\x65\x4a\xee\xb8\xab\x4c\x50\x0f\xc0\xd3\x8f\xed\x82\xbd\x37\x47\xeb\x1e\x44\x50\xd6\x6c\x37\xc5\xdb\x4e\x3b\x88\x83\x86\xde\x62\x2b\xc4\xdf\x04\xad\x4b\x30\x1e\x64\x27\x1f\xac\x93\xe0\x5e\x44\x1f\x9c\x2a\x5f\xa4\xc2\x3e\x82\xeb\x02\x20\xa3\x07\x2b\x9f\x7b\x89\x64\x77\x12\x48\x2c\xd2\xfb\x92\x62\xda\x6e\x70\x39\x3a\x91\x88\xc0\x7a\x1f\x44\xa8\xfc\xfa\x52\xb9\x50\xac\xef\xef\xae\x10\xa2\x0d\x9e\x9c\xec\x6d\x4e\x06\x71\x7d\x72\x86\x02\x85\x93\xae\x46\x48\x1c\x2a\xa5\xa5\x3a\x65\xcf\xd3\x4b\xda\xf9\x1f\x01\xa9\x6b\x27\x4c\x0e\xec\xcd\x1f\xf0\xfc\x23\x7b\xf3\x28\x74\x05\xec\x62\xc7\xd6\x14\x52\xac\xf3\x39\xe0\x98\xcf\x6c\x09\x58\x5d\xfb\x27\x47\xa8\xc8\x40\x44\x67\x01\xc6\xd6\xec\x97\xb0\xa3\x40\x5a\xba\x7d\x35\x96\x08\xc2\x51\xe5\x95\xeb\x80\xbc\x1a\x8a\x03\x10\x4b\x07\x83\x42\xb6\x5a\x14\x09\xed\xaf\x26\x34\xd5\xe0\x89\xa4\xf8\x67\x66\xa0\x45\x29\x13\x5a\xb3\xde\x56\x54\x6c\x1a\x34\x7e\x7d\xf7\xe1\x66\x6f\x54\x59\x42\x60\xa5\x08\xc5\xad\xc3\x86\x79\x42\x2f\x07\xb7\xe9\xfb\x68\xc9\x63\x10\x2e\x87\x80\x3e\xef\xda\xc5\xc9\xeb\xbf\x54\xfd\x41\xbd\x7f\xb7\x07\xac\x77\x69\xad\xa6\x72\x8f\x12\x6b\xa3\xb9\xc5\x23\x3f\x60\x40\x2c\x3a\x8e\x89\x61\x79\x5b\x5e\x10\x19\x32\x54\x2e\x85\xd9\xf1\x9f\x78\x1f\x33\x7a\xf8\x44\x17\xc8\x3f\x72\x00\xc5\x8e\x1f\xe3\xc2\x2f\xb0\xab\x73\x96\xbe\x33\xb2\xb4\xca\x84\x29\xab\xfa\x73\x8a\x77\xd6\xb9\xfd\xe1\x8d\x38\x80\xf6\xe7\x4f\x7d\x60\xfb\xcc\x89\xf2\xac\x81\x77\xce\x59\x37\x3f\x9c\x46\x4f\x1a\x13\x58\xa6\x4d\x36\x80\x9d\x00\x1f\x81\x7a\x26\x79\x39\xdb\x12\xac\x40\x5a\xed\x38\xf2\xed\xfe\xe3\x0d\xfb\xcc\x72\x6d\x0f\x42\xe3\xba\x69\x08\x60\xda\x5d\xef\xb3\x02\x1e\xb0\xd3\x2e\x36\x9b\x6e\xe7\xda\xfa\x10\x49\x4a\xc2\x2d\x92\x93\x8a\x20\x52\xa4\xe6\xd4\xc3\x20\x4a\x4d\xd8\xf5\xe3\xc0\xc7\x79\x40\xd7\x7f\xad\xc0\x3d\xb3\x49\xf8\x93\xab\x6a\x38\x45\x3a\x03\x8b\x37\x30\x25\x62\x4c\xcf\x96\xe8\x92\xc5\xdf\xa4\x74\xea\x41\xb8\xe7\x48\x9b\xb8\xd3\x34\x94\x77\x3f\x46\xf8\x76\x43\x37\xe7\xf1\x4f\xa7\xc8\xdf\xed\x8f\xe7\xd1\x59\xe8\x27\x91\x0a\x0d\x2e\xb0\xf8\x9b\xd4\xf5\x4b\xd7\x5c\x83\xd0\xd8\x08\x9f\x59\x11\x17\x77\xf6\x8a\xd4\x11\x2d\xe6\x89\xa6\x9f\x94\x91\x2a\x13\xc1\x3a\x16\xe0\x29\x24\x15\x4e\x0b\x97\x09\x0f\x7c\x39\x8f\xb1\xd9\x85\x94\x96\x41\xf8\x67\x29\x65\x95\xf3\xd6\x25\xb1\xd9\xb0\x5d\x99\x14\x41\x24\xc1\xe6\xb9\xc6\x01\x1f\x90\xb2\x41\x95\x9c\x05\x15\x48\xee\x8e\xad\x53\xb9\x32\x42\x27\xdd\xf6\x25\xe0\x37\x0c\x98\x83\x58\x31\x65\xf2\x0b\xca\xe2\x03\x04\xd1\x76\x22\xb1\x74\x31\xd3\xb6\xc4\x91\x65\x71\x76\xb5\xea\x6c\xdd\xfd\xa5\x39\xc2\x19\x57\x06\x61\x34\x19\xf0\x2f\xd3\x6f\xc4\xdc\x48\x41\xdd\x79\xff\x0f\x29\xa8\x3d\x7c\xab\x3f\x7c\x62\x89\x4a\x07\x9e\x1a\x6b\xe0\xdb\xf9\xfd\x9d\x64\xa8\x6b\x75\x7c\x21\x32\x8d\xc6\x76\x32\xae\xdf\xfb\xdf\xc0\xe1\x33\xe0\x17\xc0\xaf\x48\x9f\x58\x5d\x7b\x85\x85\x58\xd0\x47\xae\x8b\xdc\x7e\x67\xaf\xcd\x62\x89\x73\x78\x29\xe7\x73\x4d\x29\x89\x0a\x6e\xda\x76\x7c\xf0\x0c\x18\x98\x3d\x87\xf5\xd7\x66\x31\xfd\x1c\xcc\xef\x8d\xde\x32\x73\x95\xe5\xd7\x0d\x06\xef\x42\x55\x1e\xb5\xc8\xf1\x7d\xb0\x6f\x25\xf6\x33\x89\xaf\xe5\x85\xd8\x61\x19\x63\x7a\x6d\x2f\x45\x5a\xe2\xff\x27\xe9\xaa\x57\xfe\x2b\x00\x00\xff\xff\xb2\x36\x91\x1f\xeb\x0c\x00\x00") var _webUiTemplatesStatusHtml = []byte("\x1f\x8b\x08\x00\x00\x09\x6e\x88\x00\xff\xcc\x57\xcd\x6e\xdc\x36\x10\xbe\xef\x53\xb0\x44\x8e\xd5\x2e\x10\xa0\x17\x63\x57\x07\x1b\x29\x1c\xc0\x29\xdc\xac\x7d\xe9\x25\xe0\x8a\xb3\x12\x5b\x9a\x14\x48\xca\xf5\x42\xd1\xbb\x77\x86\x92\xa2\x9f\xd5\xa6\x49\x83\xb6\xb9\xac\x39\xe4\x70\x7e\xbe\xf9\x66\x44\xd7\xb5\x84\xa3\x32\xc0\x78\x01\x42\xf2\xa6\xd9\xfe\x90\x24\xcc\xa8\x17\x96\x24\x69\x5d\x83\x91\x4d\xb3\x5a\x0d\x5a\x99\x35\x01\x4c\x40\xc5\x15\x63\x5b\xa9\x9e\x59\xa6\x85\xf7\xbb\x78\x20\x50\xc5\x25\x47\x5d\x29\xc9\x53\x3c\x47\x8d\xe2\x35\x53\x72\xc7\x5d\x65\x82\x7a\x02\x9e\xbe\x6f\x17\xec\xad\x39\x5a\xf7\x24\x82\xb2\x66\xbb\x29\x5e\x77\xda\x41\x1c\x34\xf4\x16\x5b\x21\xfe\x26\x68\x5d\x82\xf1\x20\x3b\xf9\x60\x9d\x04\xf7\x49\xf4\xc1\xa9\xf2\x93\x54\xd8\x67\x70\x5d\x00\x64\xf4\x60\xe5\xa9\x97\x48\x76\x83\x40\x62\x91\x3e\x96\x14\xd3\x76\x83\xcb\xc9\x89\x44\x04\xd6\xfb\x20\x42\xe5\xd7\xd7\xca\x85\x62\xfd\xf8\x70\x83\x10\x6d\xf0\x64\xb0\xb7\x19\x0c\xe2\x7a\x70\x86\x02\x85\x93\xae\x26\x48\x1c\x2a\xa5\xa5\x1a\xb2\xe7\xe9\x35\xed\xfc\x8f\x80\xd4\xb5\x13\x26\x07\xf6\xea\x0f\x38\xfd\xc8\x5e\x3d\x0b\x5d\x01\xbb\xda\xb1\x35\x85\x14\xeb\x7c\x09\x38\xe6\x33\x5b\x02\x56\xd7\xfe\xc9\x11\x2a\x32\x10\xd1\x59\x80\xb1\x35\xfb\x39\xec\x28\x90\x96\x6e\x5f\x8c\x25\x82\x70\x54\x79\xe5\x3a\x20\x6f\xc6\xe2\x08\xc4\xd2\xc1\xa8\x90\xad\x16\x45\x42\xfb\xab\x19\x4d\x35\x78\x22\x29\xfe\x39\x33\xd0\xa2\x94\x09\xad\x59\x6f\x2b\x2a\x36\x0d\x1a\xbf\x7d\x78\x77\xb7\x37\xaa\x2c\x21\xb0\x52\x84\xe2\xde\x61\xc3\xbc\xa0\x97\x83\xdb\xf4\x7d\xb4\xe4\x31\x08\x97\x43\x40\x9f\x0f\xed\x62\xf0\xfa\x2f\x55\x7f\x54\xef\xdf\xed\x01\xeb\x5d\x5a\xab\xa9\xdc\x93\xc4\xda\x68\xee\xf1\xc8\x8f\x18\x10\x8b\x8e\x63\x62\x5c\xde\x96\x17\x44\x86\x0c\x95\x4b\x61\x76\xfc\x27\xde\xc7\x8c\x1e\x3e\xd0\x05\xf2\x8f\x1c\x40\xb1\xe3\xc7\xb4\xf0\x0b\xec\xea\x9c\xa5\x6f\x8c\x2c\xad\x32\x61\xce\xaa\xfe\x9c\xe2\x3d\xeb\xdc\xfe\xf0\x4e\x1c\x40\xfb\xcb\xa7\x3e\xb0\x7d\xe6\x44\x79\xd1\xc0\x1b\xe7\xac\x3b\x3f\x9c\x47\x4f\x1a\x33\x58\xe6\x4d\x36\x82\x9d\x00\x9f\x80\x7a\x21\x79\x79\xb6\x25\x58\x81\xb4\xda\x71\xe4\xdb\xe3\xfb\x3b\xf6\x91\xe5\xda\x1e\x84\xc6\x75\xd3\x10\xc0\xb4\xbb\xde\x67\x05\x3c\x61\xa7\x5d\x6d\x36\xdd\xce\xad\xf5\x21\x92\x94\x84\x7b\x24\x27\x15\x41\xa4\x48\xcd\xb9\x87\x51\x94\x9a\xb0\xeb\xc7\x81\x8f\xf3\x80\xae\xff\x5a\x81\x3b\xb1\x59\xf8\xb3\xab\x6a\x3c\x45\x3a\x03\x8b\x37\x30\x25\x62\x4c\xcf\x96\xe8\x92\xc5\xdf\xa4\x74\xea\x49\xb8\x53\xa4\x4d\xdc\x69\x1a\xca\xbb\x1f\x23\x7c\xbb\xa1\x9b\xe7\xf1\xcf\xa7\xc8\xdf\xed\x4f\xe7\xd1\x45\xe8\x67\x91\x0a\x0d\x2e\xb0\xf8\x9b\xd4\x35\x5b\xdf\x82\xd0\xd8\x01\x1f\x59\x11\x17\x0f\xf6\x86\xf4\x10\x26\xe6\x89\x9f\x1f\x94\x91\x2a\x13\xc1\x3a\x16\xe0\x25\x24\x15\x8e\x09\x97\x09\x0f\x7c\x39\x81\xce\xde\x42\x12\xcb\x69\xff\xb3\x24\xb2\xca\x79\xeb\x92\xd8\x5e\xd8\xa0\x4c\x8a\x20\x92\x60\xf3\x5c\xe3\x48\x0f\x48\xd2\xa0\x4a\xce\x82\x0a\x24\x77\xc7\xd6\xa9\x5c\x19\xa1\x93\x6e\xfb\x1a\xf0\xab\x05\xcc\x41\xac\x91\x32\xf9\x15\x85\xff\x0e\x82\x68\x7b\x8f\x78\xb9\x98\x62\x5b\xd4\xc8\xab\x38\xad\x5a\x75\xb6\xee\xfe\xd2\xe4\xe0\x8c\x2b\x83\xf8\x99\x0c\xf8\xe7\x09\x37\xe1\x6a\x24\x9d\xee\xbc\xff\x87\xa4\xd3\x1e\xbe\xd6\x1f\x3e\xaa\x44\xa5\x03\x4f\x8d\x35\xf0\xf5\x8c\xfe\x46\x32\xd4\xb5\x3a\x12\xe0\x3e\xb4\x43\x70\xfd\xd6\xff\x06\x0e\xbf\xf8\xbf\x00\x7e\x30\xfa\x8c\xea\xda\x2b\xac\xc0\x58\x11\x69\x2d\x72\xfb\x8d\xfd\x34\x78\x8f\x43\x76\x29\xbd\x4b\x1d\x27\xa9\xea\x6e\xde\x5a\x71\x00\x8e\xec\x5d\xc2\xf3\x4b\xe3\x9e\x0f\xf9\xf3\x7b\x93\x17\xca\xb9\xca\xf2\x9b\x05\xa3\x76\xa1\x2a\x8f\x5a\xe4\xf8\xd5\xdf\xb7\x12\xfb\x99\xc4\xef\xe5\xdd\xd7\xbd\x01\x62\x4c\xdf\xdb\xfb\x8f\x96\xf8\x5f\x47\xba\xea\x95\xff\x0a\x00\x00\xff\xff\xd0\x50\xb7\x0a\xc1\x0c\x00\x00")
func webUiTemplatesStatusHtmlBytes() ([]byte, error) { func webUiTemplatesStatusHtmlBytes() ([]byte, error) {
return bindataRead( return bindataRead(
@ -174,7 +174,7 @@ func webUiTemplatesStatusHtml() (*asset, error) {
return nil, err return nil, err
} }
info := bindataFileInfo{name: "web/ui/templates/status.html", size: 3307, mode: os.FileMode(420), modTime: time.Unix(1455530985, 0)} info := bindataFileInfo{name: "web/ui/templates/status.html", size: 3265, mode: os.FileMode(420), modTime: time.Unix(1456687049, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }

View File

@ -55,8 +55,8 @@
{{end}} {{end}}
</td> </td>
<td> <td>
<span class="alert alert-{{ .Status.Health | healthToClass }} state_indicator text-uppercase"> <span class="alert alert-{{ .Health | healthToClass }} state_indicator text-uppercase">
{{.Status.Health}} {{.Health}}
</span> </span>
</td> </td>
<td> <td>
@ -70,11 +70,11 @@
</span> </span>
</td> </td>
<td> <td>
{{if .Status.LastScrape.IsZero}}Never{{else}}{{since .Status.LastScrape}} ago{{end}} {{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}}
</td> </td>
<td> <td>
{{if .Status.LastError}} {{if .LastError}}
<span class="alert alert-danger state_indicator">{{.Status.LastError}}</span> <span class="alert alert-danger state_indicator">{{.LastError}}</span>
{{end}} {{end}}
</td> </td>
</tr> </tr>