mirror of https://github.com/prometheus/prometheus
Browse Source
This commit adds service discovery using Consul's HTTP API and watches (long polling) to retrieve target updates.pull/698/head
Fabian Reinartz
10 years ago
1 changed files with 266 additions and 0 deletions
@ -0,0 +1,266 @@
|
||||
package discovery |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net/http" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/golang/glog" |
||||
|
||||
consul "github.com/hashicorp/consul/api" |
||||
clientmodel "github.com/prometheus/client_golang/model" |
||||
|
||||
"github.com/prometheus/prometheus/config" |
||||
) |
||||
|
||||
const ( |
||||
consulSourcePrefix = "consul" |
||||
consulWatchTimeout = 30 * time.Second |
||||
consulRetryInterval = 15 * time.Second |
||||
|
||||
// ConsuleNodeLabel is the name for the label containing a target's node name.
|
||||
ConsulNodeLabel = clientmodel.MetaLabelPrefix + "consul_node" |
||||
// ConsulTagsLabel is the name of the label containing the tags assigned to the target.
|
||||
ConsulTagsLabel = clientmodel.MetaLabelPrefix + "consul_tags" |
||||
// ConsulServiceLabel is the name of the label containing the service name.
|
||||
ConsulServiceLabel = clientmodel.MetaLabelPrefix + "consul_service" |
||||
// ConsulDCLabel is the name of the label containing the datacenter ID.
|
||||
ConsulDCLabel = clientmodel.MetaLabelPrefix + "consul_dc" |
||||
) |
||||
|
||||
// ConsulDiscovery retrieves target information from a Consul server
|
||||
// and updates them via watches.
|
||||
type ConsulDiscovery struct { |
||||
client *consul.Client |
||||
clientConf *consul.Config |
||||
tagSeparator string |
||||
scrapedServices map[string]struct{} |
||||
|
||||
mu sync.RWMutex |
||||
services map[string]*consulService |
||||
runDone, srvsDone chan struct{} |
||||
} |
||||
|
||||
// consulService contains data belonging to the same service.
|
||||
type consulService struct { |
||||
name string |
||||
tgroup *config.TargetGroup |
||||
lastIndex uint64 |
||||
removed bool |
||||
running bool |
||||
done chan struct{} |
||||
} |
||||
|
||||
// NewConsulDiscovery returns a new ConsulDiscovery for the given config.
|
||||
func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery { |
||||
clientConf := &consul.Config{ |
||||
Address: conf.Server, |
||||
Scheme: conf.Scheme, |
||||
Datacenter: conf.Datacenter, |
||||
Token: conf.Token, |
||||
HttpAuth: &consul.HttpBasicAuth{ |
||||
Username: conf.Username, |
||||
Password: conf.Password, |
||||
}, |
||||
} |
||||
client, err := consul.NewClient(clientConf) |
||||
if err != nil { |
||||
// NewClient always returns a nil error.
|
||||
panic(fmt.Errorf("discovery.NewConsulDiscovery: %s", err)) |
||||
} |
||||
cd := &ConsulDiscovery{ |
||||
client: client, |
||||
clientConf: clientConf, |
||||
tagSeparator: conf.TagSeparator, |
||||
runDone: make(chan struct{}), |
||||
srvsDone: make(chan struct{}, 1), |
||||
scrapedServices: map[string]struct{}{}, |
||||
services: map[string]*consulService{}, |
||||
} |
||||
for _, name := range conf.Services { |
||||
cd.scrapedServices[name] = struct{}{} |
||||
} |
||||
return cd |
||||
} |
||||
|
||||
// Sources implements the TargetProvider interface.
|
||||
func (cd *ConsulDiscovery) Sources() []string { |
||||
clientConf := *cd.clientConf |
||||
clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second} |
||||
|
||||
client, err := consul.NewClient(&clientConf) |
||||
if err != nil { |
||||
// NewClient always returns a nil error.
|
||||
panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err)) |
||||
} |
||||
|
||||
srvs, _, err := client.Catalog().Services(nil) |
||||
if err != nil { |
||||
glog.Errorf("Error refreshing service list: %s", err) |
||||
return nil |
||||
} |
||||
cd.mu.Lock() |
||||
defer cd.mu.Unlock() |
||||
|
||||
srcs := make([]string, 0, len(srvs)) |
||||
for name := range srvs { |
||||
if _, ok := cd.scrapedServices[name]; ok { |
||||
srcs = append(srcs, consulSourcePrefix+":"+name) |
||||
} |
||||
} |
||||
return srcs |
||||
} |
||||
|
||||
// Run implements the TargetProvider interface.
|
||||
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { |
||||
defer close(ch) |
||||
|
||||
update := make(chan *consulService, 10) |
||||
go cd.watchServices(update) |
||||
|
||||
for { |
||||
select { |
||||
case <-cd.runDone: |
||||
return |
||||
case srv := <-update: |
||||
if srv.removed { |
||||
ch <- &config.TargetGroup{Source: consulSourcePrefix + ":" + srv.name} |
||||
break |
||||
} |
||||
// Launch watcher for the service.
|
||||
if !srv.running { |
||||
go cd.watchService(srv, ch) |
||||
srv.running = true |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Stop implements the TargetProvider interface.
|
||||
func (cd *ConsulDiscovery) Stop() { |
||||
glog.V(1).Infof("Stopping Consul service discovery for %s", cd.clientConf.Address) |
||||
|
||||
// The lock prevents Run from terminating while the watchers attempt
|
||||
// to send on their channels.
|
||||
cd.mu.Lock() |
||||
defer cd.mu.Unlock() |
||||
|
||||
// The watching goroutines will terminate after their next watch timeout.
|
||||
// As this can take long, the channel is buffered and we do not wait.
|
||||
for _, srv := range cd.services { |
||||
srv.done <- struct{}{} |
||||
} |
||||
cd.srvsDone <- struct{}{} |
||||
|
||||
// Terminate Run.
|
||||
cd.runDone <- struct{}{} |
||||
|
||||
glog.V(1).Infof("Consul service discovery for %s stopped.", cd.clientConf.Address) |
||||
} |
||||
|
||||
// watchServices retrieves updates from Consul's services endpoint and sends
|
||||
// potential updates to the update channel.
|
||||
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { |
||||
var lastIndex uint64 |
||||
for { |
||||
catalog := cd.client.Catalog() |
||||
srvs, meta, err := catalog.Services(&consul.QueryOptions{ |
||||
RequireConsistent: false, |
||||
WaitIndex: lastIndex, |
||||
}) |
||||
if err != nil { |
||||
glog.Errorf("Error refreshing service list: %s", err) |
||||
<-time.After(consulRetryInterval) |
||||
continue |
||||
} |
||||
// If the index equals the previous one, the watch timed out with no update.
|
||||
if meta.LastIndex == lastIndex { |
||||
continue |
||||
} |
||||
lastIndex = meta.LastIndex |
||||
|
||||
cd.mu.Lock() |
||||
select { |
||||
case <-cd.srvsDone: |
||||
return |
||||
default: |
||||
// Continue.
|
||||
} |
||||
// Check for new services.
|
||||
for name := range srvs { |
||||
if _, ok := cd.scrapedServices[name]; !ok { |
||||
continue |
||||
} |
||||
srv, ok := cd.services[name] |
||||
if !ok { |
||||
srv = &consulService{ |
||||
name: name, |
||||
tgroup: &config.TargetGroup{}, |
||||
done: make(chan struct{}, 1), |
||||
} |
||||
srv.tgroup.Source = consulSourcePrefix + ":" + name |
||||
cd.services[name] = srv |
||||
} |
||||
srv.tgroup.Labels = clientmodel.LabelSet{ |
||||
ConsulServiceLabel: clientmodel.LabelValue(name), |
||||
ConsulDCLabel: clientmodel.LabelValue(cd.clientConf.Datacenter), |
||||
} |
||||
update <- srv |
||||
} |
||||
// Check for removed services.
|
||||
for name, srv := range cd.services { |
||||
if _, ok := srvs[name]; !ok { |
||||
srv.removed = true |
||||
update <- srv |
||||
srv.done <- struct{}{} |
||||
delete(cd.services, name) |
||||
} |
||||
} |
||||
cd.mu.Unlock() |
||||
} |
||||
} |
||||
|
||||
// watchService retrieves updates about srv from Consul's service endpoint.
|
||||
// On a potential update the resulting target group is sent to ch.
|
||||
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { |
||||
catalog := cd.client.Catalog() |
||||
for { |
||||
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ |
||||
WaitIndex: srv.lastIndex, |
||||
WaitTime: consulWatchTimeout, |
||||
}) |
||||
if err != nil { |
||||
glog.Errorf("Error refreshing service %s: %s", srv.name, err) |
||||
<-time.After(consulRetryInterval) |
||||
continue |
||||
} |
||||
// If the index equals the previous one, the watch timed out with no update.
|
||||
if meta.LastIndex == srv.lastIndex { |
||||
continue |
||||
} |
||||
srv.lastIndex = meta.LastIndex |
||||
srv.tgroup.Targets = make([]clientmodel.LabelSet, 0, len(nodes)) |
||||
|
||||
for _, node := range nodes { |
||||
addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) |
||||
tags := strings.Join(node.ServiceTags, cd.tagSeparator) |
||||
|
||||
srv.tgroup.Targets = append(srv.tgroup.Targets, clientmodel.LabelSet{ |
||||
clientmodel.AddressLabel: clientmodel.LabelValue(addr), |
||||
ConsulNodeLabel: clientmodel.LabelValue(node.Node), |
||||
ConsulTagsLabel: clientmodel.LabelValue(tags), |
||||
}) |
||||
} |
||||
cd.mu.Lock() |
||||
select { |
||||
case <-srv.done: |
||||
return |
||||
default: |
||||
// Continue.
|
||||
} |
||||
ch <- srv.tgroup |
||||
cd.mu.Unlock() |
||||
} |
||||
} |
Loading…
Reference in new issue