From 9b9c58aea8e0043bb944961fb776b48ac602c7f9 Mon Sep 17 00:00:00 2001 From: Mathilde Gilles Date: Tue, 25 Feb 2020 14:32:30 +0100 Subject: [PATCH] [Consul] Add health label to metrics (#5313) Label metrics with the target health using consul's /health endpoint. Signed-off-by: Mathilde Gilles --- discovery/consul/consul.go | 41 +++++++++++----------- discovery/consul/consul_test.go | 60 +++++++++++++++++++++++---------- 2 files changed, 65 insertions(+), 36 deletions(-) diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 9c02eecd1..ff1158a0a 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -51,6 +51,8 @@ const ( tagsLabel = model.MetaLabelPrefix + "consul_tags" // serviceLabel is the name of the label containing the service name. serviceLabel = model.MetaLabelPrefix + "consul_service" + // healthLabel is the name of the label containing the health of the service instance + healthLabel = model.MetaLabelPrefix + "consul_health" // serviceAddressLabel is the name of the label containing the (optional) service address. serviceAddressLabel = model.MetaLabelPrefix + "consul_service_address" //servicePortLabel is the name of the label containing the service port. @@ -440,14 +442,14 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G go func() { ticker := time.NewTicker(d.refreshInterval) var lastIndex uint64 - catalog := srv.client.Catalog() + health := srv.client.Health() for { select { case <-ctx.Done(): ticker.Stop() return default: - srv.watch(ctx, ch, catalog, &lastIndex) + srv.watch(ctx, ch, health, &lastIndex) select { case <-ticker.C: case <-ctx.Done(): @@ -458,7 +460,7 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G } // Get updates for a service. -func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, catalog *consul.Catalog, lastIndex *uint64) { +func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group, health *consul.Health, lastIndex *uint64) { level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tags", strings.Join(srv.tags, ",")) t0 := time.Now() @@ -468,7 +470,8 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr AllowStale: srv.discovery.allowStale, NodeMeta: srv.discovery.watchedNodeMeta, } - nodes, meta, err := catalog.ServiceMultipleTags(srv.name, srv.tags, opts.WithContext(ctx)) + + serviceNodes, meta, err := health.ServiceMultipleTags(srv.name, srv.tags, false, opts.WithContext(ctx)) elapsed := time.Since(t0) serviceRPCDuraion.Observe(elapsed.Seconds()) @@ -495,48 +498,48 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr tgroup := targetgroup.Group{ Source: srv.name, Labels: srv.labels, - Targets: make([]model.LabelSet, 0, len(nodes)), + Targets: make([]model.LabelSet, 0, len(serviceNodes)), } - for _, node := range nodes { - + for _, serviceNode := range serviceNodes { // We surround the separated list with the separator as well. This way regular expressions // in relabeling rules don't have to consider tag positions. - var tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator + var tags = srv.tagSeparator + strings.Join(serviceNode.Service.Tags, srv.tagSeparator) + srv.tagSeparator // If the service address is not empty it should be used instead of the node address // since the service may be registered remotely through a different node. var addr string - if node.ServiceAddress != "" { - addr = net.JoinHostPort(node.ServiceAddress, fmt.Sprintf("%d", node.ServicePort)) + if serviceNode.Service.Address != "" { + addr = net.JoinHostPort(serviceNode.Service.Address, fmt.Sprintf("%d", serviceNode.Service.Port)) } else { - addr = net.JoinHostPort(node.Address, fmt.Sprintf("%d", node.ServicePort)) + addr = net.JoinHostPort(serviceNode.Node.Address, fmt.Sprintf("%d", serviceNode.Service.Port)) } labels := model.LabelSet{ model.AddressLabel: model.LabelValue(addr), - addressLabel: model.LabelValue(node.Address), - nodeLabel: model.LabelValue(node.Node), + addressLabel: model.LabelValue(serviceNode.Node.Address), + nodeLabel: model.LabelValue(serviceNode.Node.Node), tagsLabel: model.LabelValue(tags), - serviceAddressLabel: model.LabelValue(node.ServiceAddress), - servicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)), - serviceIDLabel: model.LabelValue(node.ServiceID), + serviceAddressLabel: model.LabelValue(serviceNode.Service.Address), + servicePortLabel: model.LabelValue(strconv.Itoa(serviceNode.Service.Port)), + serviceIDLabel: model.LabelValue(serviceNode.Service.ID), + healthLabel: model.LabelValue(serviceNode.Checks.AggregatedStatus()), } // Add all key/value pairs from the node's metadata as their own labels. - for k, v := range node.NodeMeta { + for k, v := range serviceNode.Node.Meta { name := strutil.SanitizeLabelName(k) labels[metaDataLabel+model.LabelName(name)] = model.LabelValue(v) } // Add all key/value pairs from the service's metadata as their own labels. - for k, v := range node.ServiceMeta { + for k, v := range serviceNode.Service.Meta { name := strutil.SanitizeLabelName(k) labels[serviceMetaDataLabel+model.LabelName(name)] = model.LabelValue(v) } // Add all key/value pairs from the service's tagged addresses as their own labels. - for k, v := range node.TaggedAddresses { + for k, v := range serviceNode.Node.TaggedAddresses { name := strutil.SanitizeLabelName(k) labels[taggedAddressesLabel+model.LabelName(name)] = model.LabelValue(v) } diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index d91c58bd0..ae589d88f 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -172,21 +172,47 @@ func TestNonConfiguredService(t *testing.T) { const ( AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}` - ServiceTestAnswer = `[{ -"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e", -"Node": "node1", -"Address": "1.1.1.1", -"Datacenter": "test-dc", -"TaggedAddresses": {"lan":"192.168.10.10","wan":"10.0.10.10"}, -"NodeMeta": {"rack_name": "2304"}, -"ServiceID": "test", -"ServiceName": "test", -"ServiceMeta": {"version":"1.0.0","environment":"staging"}, -"ServiceTags": ["tag1"], -"ServicePort": 3341, -"CreateIndex": 1, -"ModifyIndex": 1 + ServiceTestAnswer = ` +[{ + "Node": { + "ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e", + "Node": "node1", + "Address": "1.1.1.1", + "Datacenter": "test-dc", + "TaggedAddresses": { + "lan": "192.168.10.10", + "wan": "10.0.10.10" + }, + "Meta": {"rack_name": "2304"}, + "CreateIndex": 1, + "ModifyIndex": 1 + }, + "Service": { + "ID": "test", + "Service": "test", + "Tags": ["tag1"], + "Address": "", + "Meta": {"version":"1.0.0","environment":"stagging"}, + "Port": 3341, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false, + "ProxyDestination": "", + "Proxy": {}, + "Connect": {}, + "CreateIndex": 1, + "ModifyIndex": 1 + }, + "Checks": [{ + "Node": "node1", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing" + }] }]` + ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}` ) @@ -197,11 +223,11 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) { switch r.URL.String() { case "/v1/agent/self": response = AgentAnswer - case "/v1/catalog/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=30000ms": + case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=30000ms": response = ServiceTestAnswer - case "/v1/catalog/service/test?wait=30000ms": + case "/v1/health/service/test?wait=30000ms": response = ServiceTestAnswer - case "/v1/catalog/service/other?wait=30000ms": + case "/v1/health/service/other?wait=30000ms": response = `[]` case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=30000ms": response = ServicesTestAnswer