Browse Source

[Consul] Add health label to metrics (#5313)

Label metrics with the target health using consul's /health endpoint.

Signed-off-by: Mathilde Gilles <m.gilles@criteo.com>
pull/6874/head
Mathilde Gilles 5 years ago committed by GitHub
parent
commit
9b9c58aea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      discovery/consul/consul.go
  2. 60
      discovery/consul/consul_test.go

41
discovery/consul/consul.go

@ -51,6 +51,8 @@ const (
tagsLabel = model.MetaLabelPrefix + "consul_tags"
// serviceLabel is the name of the label containing the service name.
serviceLabel = model.MetaLabelPrefix + "consul_service"
// healthLabel is the name of the label containing the health of the service instance
healthLabel = model.MetaLabelPrefix + "consul_health"
// serviceAddressLabel is the name of the label containing the (optional) service address.
serviceAddressLabel = model.MetaLabelPrefix + "consul_service_address"
//servicePortLabel is the name of the label containing the service port.
@ -440,14 +442,14 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G
go func() {
ticker := time.NewTicker(d.refreshInterval)
var lastIndex uint64
catalog := srv.client.Catalog()
health := srv.client.Health()
for {
select {
case <-ctx.Done():
ticker.Stop()
return
default:
srv.watch(ctx, ch, catalog, &lastIndex)
srv.watch(ctx, ch, health, &lastIndex)
select {
case <-ticker.C:
case <-ctx.Done():
@ -458,7 +460,7 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G
}
// Get updates for a service.
func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, catalog *consul.Catalog, lastIndex *uint64) {
func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, health *consul.Health, lastIndex *uint64) {
level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ","))
t0 := time.Now()
@ -468,7 +470,8 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
AllowStale: srv.discovery.allowStale,
NodeMeta: srv.discovery.watchedNodeMeta,
}
nodes, meta, err := catalog.ServiceMultipleTags(srv.name, srv.tags, opts.WithContext(ctx))
serviceNodes, meta, err := health.ServiceMultipleTags(srv.name, srv.tags, false, opts.WithContext(ctx))
elapsed := time.Since(t0)
serviceRPCDuraion.Observe(elapsed.Seconds())
@ -495,48 +498,48 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
tgroup := targetgroup.Group{
Source: srv.name,
Labels: srv.labels,
Targets: make([]model.LabelSet, 0, len(nodes)),
Targets: make([]model.LabelSet, 0, len(serviceNodes)),
}
for _, node := range nodes {
for _, serviceNode := range serviceNodes {
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
var tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator
var tags = srv.tagSeparator + strings.Join(serviceNode.Service.Tags, srv.tagSeparator) + srv.tagSeparator
// If the service address is not empty it should be used instead of the node address
// since the service may be registered remotely through a different node.
var addr string
if node.ServiceAddress != "" {
addr = net.JoinHostPort(node.ServiceAddress, fmt.Sprintf("%d", node.ServicePort))
if serviceNode.Service.Address != "" {
addr = net.JoinHostPort(serviceNode.Service.Address, fmt.Sprintf("%d", serviceNode.Service.Port))
} else {
addr = net.JoinHostPort(node.Address, fmt.Sprintf("%d", node.ServicePort))
addr = net.JoinHostPort(serviceNode.Node.Address, fmt.Sprintf("%d", serviceNode.Service.Port))
}
labels := model.LabelSet{
model.AddressLabel: model.LabelValue(addr),
addressLabel: model.LabelValue(node.Address),
nodeLabel: model.LabelValue(node.Node),
addressLabel: model.LabelValue(serviceNode.Node.Address),
nodeLabel: model.LabelValue(serviceNode.Node.Node),
tagsLabel: model.LabelValue(tags),
serviceAddressLabel: model.LabelValue(node.ServiceAddress),
servicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)),
serviceIDLabel: model.LabelValue(node.ServiceID),
serviceAddressLabel: model.LabelValue(serviceNode.Service.Address),
servicePortLabel: model.LabelValue(strconv.Itoa(serviceNode.Service.Port)),
serviceIDLabel: model.LabelValue(serviceNode.Service.ID),
healthLabel: model.LabelValue(serviceNode.Checks.AggregatedStatus()),
}
// Add all key/value pairs from the node's metadata as their own labels.
for k, v := range node.NodeMeta {
for k, v := range serviceNode.Node.Meta {
name := strutil.SanitizeLabelName(k)
labels[metaDataLabel+model.LabelName(name)] = model.LabelValue(v)
}
// Add all key/value pairs from the service's metadata as their own labels.
for k, v := range node.ServiceMeta {
for k, v := range serviceNode.Service.Meta {
name := strutil.SanitizeLabelName(k)
labels[serviceMetaDataLabel+model.LabelName(name)] = model.LabelValue(v)
}
// Add all key/value pairs from the service's tagged addresses as their own labels.
for k, v := range node.TaggedAddresses {
for k, v := range serviceNode.Node.TaggedAddresses {
name := strutil.SanitizeLabelName(k)
labels[taggedAddressesLabel+model.LabelName(name)] = model.LabelValue(v)
}

60
discovery/consul/consul_test.go

@ -172,21 +172,47 @@ func TestNonConfiguredService(t *testing.T) {
const (
AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}`
ServiceTestAnswer = `[{
"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e",
"Node": "node1",
"Address": "1.1.1.1",
"Datacenter": "test-dc",
"TaggedAddresses": {"lan":"192.168.10.10","wan":"10.0.10.10"},
"NodeMeta": {"rack_name": "2304"},
"ServiceID": "test",
"ServiceName": "test",
"ServiceMeta": {"version":"1.0.0","environment":"staging"},
"ServiceTags": ["tag1"],
"ServicePort": 3341,
"CreateIndex": 1,
"ModifyIndex": 1
ServiceTestAnswer = `
[{
"Node": {
"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e",
"Node": "node1",
"Address": "1.1.1.1",
"Datacenter": "test-dc",
"TaggedAddresses": {
"lan": "192.168.10.10",
"wan": "10.0.10.10"
},
"Meta": {"rack_name": "2304"},
"CreateIndex": 1,
"ModifyIndex": 1
},
"Service": {
"ID": "test",
"Service": "test",
"Tags": ["tag1"],
"Address": "",
"Meta": {"version":"1.0.0","environment":"stagging"},
"Port": 3341,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"ProxyDestination": "",
"Proxy": {},
"Connect": {},
"CreateIndex": 1,
"ModifyIndex": 1
},
"Checks": [{
"Node": "node1",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing"
}]
}]`
ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}`
)
@ -197,11 +223,11 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
switch r.URL.String() {
case "/v1/agent/self":
response = AgentAnswer
case "/v1/catalog/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=30000ms":
case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=30000ms":
response = ServiceTestAnswer
case "/v1/catalog/service/test?wait=30000ms":
case "/v1/health/service/test?wait=30000ms":
response = ServiceTestAnswer
case "/v1/catalog/service/other?wait=30000ms":
case "/v1/health/service/other?wait=30000ms":
response = `[]`
case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=30000ms":
response = ServicesTestAnswer

Loading…
Cancel
Save