Merge pull request #1151 from prometheus/fix-sd-source-handling

Fix SD mechanism source prefix handling.
pull/1156/head
Julius Volz 9 years ago
commit 288964eeaf

@ -66,7 +66,7 @@ type ConsulDiscovery struct {
// consulService contains data belonging to the same service. // consulService contains data belonging to the same service.
type consulService struct { type consulService struct {
name string name string
tgroup *config.TargetGroup tgroup config.TargetGroup
lastIndex uint64 lastIndex uint64
removed bool removed bool
running bool running bool
@ -143,7 +143,7 @@ func (cd *ConsulDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer cd.stop() defer cd.stop()
@ -159,7 +159,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct
close(srv.done) close(srv.done)
// Send clearing update. // Send clearing update.
ch <- &config.TargetGroup{Source: srv.name} ch <- config.TargetGroup{Source: srv.name}
break break
} }
// Launch watcher for the service. // Launch watcher for the service.
@ -219,9 +219,8 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
srv, ok := cd.services[name] srv, ok := cd.services[name]
if !ok { if !ok {
srv = &consulService{ srv = &consulService{
name: name, name: name,
tgroup: &config.TargetGroup{}, done: make(chan struct{}),
done: make(chan struct{}),
} }
srv.tgroup.Source = name srv.tgroup.Source = name
cd.services[name] = srv cd.services[name] = srv
@ -246,7 +245,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
// watchService retrieves updates about srv from Consul's service endpoint. // watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch. // On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) {
catalog := cd.client.Catalog() catalog := cd.client.Catalog()
for { for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{

@ -91,7 +91,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(dd.interval) ticker := time.NewTicker(dd.interval)
@ -119,7 +119,7 @@ func (dd *DNSDiscovery) Sources() []string {
return srcs return srcs
} }
func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(dd.names)) wg.Add(len(dd.names))
for _, name := range dd.names { for _, name := range dd.names {
@ -133,7 +133,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) {
wg.Wait() wg.Wait()
} }
func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error { func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype) response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
@ -141,7 +141,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro
return err return err
} }
tg := &config.TargetGroup{} var tg config.TargetGroup
for _, record := range response.Answer { for _, record := range response.Answer {
target := model.LabelValue("") target := model.LabelValue("")
switch addr := record.(type) { switch addr := record.(type) {

@ -62,7 +62,7 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(ed.interval) ticker := time.NewTicker(ed.interval)
@ -73,7 +73,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- tg ch <- *tg
} }
for { for {
@ -83,7 +83,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- tg ch <- *tg
} }
case <-done: case <-done:
return return

@ -103,7 +103,7 @@ func (fd *FileDiscovery) watchFiles() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer fd.stop() defer fd.stop()
@ -188,7 +188,7 @@ func (fd *FileDiscovery) stop() {
// refresh reads all files matching the discovery's patterns and sends the respective // refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel. // updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
ref := map[string]int{} ref := map[string]int{}
for _, p := range fd.listFiles() { for _, p := range fd.listFiles() {
tgroups, err := readFile(p) tgroups, err := readFile(p)
@ -199,7 +199,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
continue continue
} }
for _, tg := range tgroups { for _, tg := range tgroups {
ch <- tg ch <- *tg
} }
ref[p] = len(tgroups) ref[p] = len(tgroups)
} }
@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
for i := m; i < n; i++ { for i := m; i < n; i++ {
ch <- &config.TargetGroup{Source: fileSource(f, i)} ch <- config.TargetGroup{Source: fileSource(f, i)}
} }
} }
} }

@ -26,7 +26,7 @@ func testFileSD(t *testing.T, ext string) {
var ( var (
fsd = NewFileDiscovery(&conf) fsd = NewFileDiscovery(&conf)
ch = make(chan *config.TargetGroup) ch = make(chan config.TargetGroup)
done = make(chan struct{}) done = make(chan struct{})
) )
go fsd.Run(ch, done) go fsd.Run(ch, done)

@ -173,25 +173,35 @@ func (kd *Discovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
select { if tg := kd.updateMastersTargetGroup(); tg != nil {
case ch <- kd.updateMastersTargetGroup(): select {
case <-done: case ch <- *tg:
return case <-done:
return
}
} }
select { if tg := kd.updateNodesTargetGroup(); tg != nil {
case ch <- kd.updateNodesTargetGroup(): select {
case <-done: case ch <- *tg:
return case <-done:
return
}
} }
for _, ns := range kd.services { for _, ns := range kd.services {
for _, service := range ns { for _, service := range ns {
tg := kd.addService(service)
if tg == nil {
continue
}
select { select {
case ch <- kd.addService(service): case ch <- *tg:
case <-done: case <-done:
return return
} }
@ -223,8 +233,12 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
} }
} }
if tg == nil {
continue
}
select { select {
case ch <- tg: case ch <- *tg:
case <-done: case <-done:
return return
} }

@ -53,7 +53,7 @@ func (md *MarathonDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for { for {
@ -69,7 +69,7 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan stru
} }
} }
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups() targetMap, err := md.fetchTargetGroups()
if err != nil { if err != nil {
return err return err
@ -77,7 +77,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error
// Update services which are still present // Update services which are still present
for _, tg := range targetMap { for _, tg := range targetMap {
ch <- tg ch <- *tg
} }
// Remove services which did disappear // Remove services which did disappear
@ -85,7 +85,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
log.Debugf("Removing group for %s", source) log.Debugf("Removing group for %s", source)
ch <- &config.TargetGroup{Source: source} ch <- config.TargetGroup{Source: source}
} }
} }

@ -26,8 +26,8 @@ import (
var marathonValidLabel = map[string]string{"prometheus": "yes"} var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) { func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) {
ch := make(chan *config.TargetGroup) ch := make(chan config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{ md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"}, Servers: []string{"http://localhost:8080"},
}) })

@ -67,7 +67,7 @@ type ServersetDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup sdUpdates *chan<- config.TargetGroup
updates chan zookeeperTreeCacheEvent updates chan zookeeperTreeCacheEvent
treeCaches []*zookeeperTreeCache treeCaches []*zookeeperTreeCache
} }
@ -124,7 +124,7 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- tg *sd.sdUpdates <- *tg
} }
} }
@ -134,11 +134,11 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources { for _, targetGroup := range sd.sources {
ch <- targetGroup ch <- *targetGroup
} }
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch

@ -52,12 +52,12 @@ type fakeTargetProvider struct {
update chan *config.TargetGroup update chan *config.TargetGroup
} }
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (tp *fakeTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for { for {
select { select {
case tg := <-tp.update: case tg := <-tp.update:
ch <- tg ch <- *tg
case <-done: case <-done:
return return
} }

@ -43,7 +43,7 @@ type TargetProvider interface {
// updated target groups. The channel must be closed by the target provider // updated target groups. The channel must be closed by the target provider
// if no more updates will be sent. // if no more updates will be sent.
// On receiving from done Run must return. // On receiving from done Run must return.
Run(up chan<- *config.TargetGroup, done <-chan struct{}) Run(up chan<- config.TargetGroup, done <-chan struct{})
} }
// TargetManager maintains a set of targets, starts and stops their scraping and // TargetManager maintains a set of targets, starts and stops their scraping and
@ -105,7 +105,7 @@ func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGr
// targetGroupUpdate is a potentially changed/new target group // targetGroupUpdate is a potentially changed/new target group
// for the given scrape configuration. // for the given scrape configuration.
type targetGroupUpdate struct { type targetGroupUpdate struct {
tg *config.TargetGroup tg config.TargetGroup
scfg *config.ScrapeConfig scfg *config.ScrapeConfig
} }
@ -126,9 +126,9 @@ func (tm *TargetManager) Run() {
sources[src] = struct{}{} sources[src] = struct{}{}
} }
tgc := make(chan *config.TargetGroup) tgc := make(chan config.TargetGroup)
// Run the target provider after cleanup of the stale targets is done. // Run the target provider after cleanup of the stale targets is done.
defer func(prov TargetProvider, tgc chan<- *config.TargetGroup, done <-chan struct{}) { defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) {
go prov.Run(tgc, done) go prov.Run(tgc, done)
}(prov, tgc, tm.done) }(prov, tgc, tm.done)
@ -140,9 +140,6 @@ func (tm *TargetManager) Run() {
for { for {
select { select {
case tg := <-tgc: case tg := <-tgc:
if tg == nil {
break
}
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} tgupc <- targetGroupUpdate{tg: tg, scfg: scfg}
case <-done: case <-done:
return return
@ -179,12 +176,9 @@ func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan
if !ok { if !ok {
return return
} }
if update.tg == nil {
break
}
log.Debugf("Received potential update for target group %q", update.tg.Source) log.Debugf("Received potential update for target group %q", update.tg.Source)
if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil { if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil {
log.Errorf("Error updating targets: %s", err) log.Errorf("Error updating targets: %s", err)
} }
case <-done: case <-done:
@ -382,10 +376,10 @@ func (tp *prefixedTargetProvider) Sources() []string {
return srcs return srcs
} }
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ch2 := make(chan *config.TargetGroup) ch2 := make(chan config.TargetGroup)
go tp.TargetProvider.Run(ch2, done) go tp.TargetProvider.Run(ch2, done)
for { for {
@ -393,9 +387,6 @@ func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan
case <-done: case <-done:
return return
case tg := <-ch2: case tg := <-ch2:
if tg == nil {
break
}
tg.Source = tp.prefix(tg.Source) tg.Source = tp.prefix(tg.Source)
ch <- tg ch <- tg
} }
@ -537,14 +528,14 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for _, tg := range sd.TargetGroups { for _, tg := range sd.TargetGroups {
select { select {
case <-done: case <-done:
return return
case ch <- tg: case ch <- *tg:
} }
} }
<-done <-done

@ -52,7 +52,7 @@ func TestPrefixedTargetProvider(t *testing.T) {
t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) t.Fatalf("expected sources %v, got %v", expSources, tp.Sources())
} }
ch := make(chan *config.TargetGroup) ch := make(chan config.TargetGroup)
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
@ -64,10 +64,10 @@ func TestPrefixedTargetProvider(t *testing.T) {
expGroup2.Source = "job-x:static:123:1" expGroup2.Source = "job-x:static:123:1"
// The static target provider sends on the channel once per target group. // The static target provider sends on the channel once per target group.
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) { if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) {
t.Fatalf("expected target group %v, got %v", expGroup1, tg) t.Fatalf("expected target group %v, got %v", expGroup1, tg)
} }
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) { if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) {
t.Fatalf("expected target group %v, got %v", expGroup2, tg) t.Fatalf("expected target group %v, got %v", expGroup2, tg)
} }
} }

Loading…
Cancel
Save