From d5634fe2a849d46283a7dbcbf6944b5153faf301 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 8 Aug 2017 01:31:38 -0700 Subject: [PATCH] Add support for labels/filters from go-metrics --- agent/agent.go | 4 + agent/config.go | 32 +++++++ agent/config_test.go | 12 +++ agent/consul/catalog_endpoint.go | 9 +- agent/consul/fsm.go | 15 ++- agent/consul/health_endpoint.go | 9 +- agent/consul/rpc.go | 3 +- agent/dns.go | 6 +- agent/http.go | 2 + api/agent.go | 26 ++++++ api/agent_test.go | 22 +++++ command/agent.go | 17 ++-- website/source/api/agent.html.md | 106 ++++++++++++++++++++++ website/source/docs/agent/options.html.md | 18 ++++ 14 files changed, 260 insertions(+), 21 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 98f02b6924..f59a55e4ec 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/consul/agent/systemd" @@ -94,6 +95,9 @@ type Agent struct { // Used for streaming logs to LogWriter *logger.LogWriter + // In-memory sink used for collecting metrics + MemSink *metrics.InmemSink + // delegate is either a *consul.Server or *consul.Client // depending on the configuration delegate delegate diff --git a/agent/config.go b/agent/config.go index e9e5b9966b..116835cf9a 100644 --- a/agent/config.go +++ b/agent/config.go @@ -219,6 +219,16 @@ type Telemetry struct { // DisableHostname will disable hostname prefixing for all metrics DisableHostname bool `mapstructure:"disable_hostname"` + // PrefixFilter is a list of filter rules to apply for allowing/blocking metrics + // by prefix. + PrefixFilter []string `mapstructure:"prefix_filter"` + AllowedPrefixes []string `mapstructure:"-" json:"-"` + BlockedPrefixes []string `mapstructure:"-" json:"-"` + + // FilterDefault is the default for whether to allow a metric that's not + // covered by the filter. + FilterDefault *bool `mapstructure:"filter_default"` + // DogStatsdAddr is the address of a dogstatsd instance. If provided, // metrics will be sent to that instance DogStatsdAddr string `mapstructure:"dogstatsd_addr"` @@ -937,6 +947,7 @@ func DefaultConfig() *Config { }, Telemetry: Telemetry{ StatsitePrefix: "consul", + FilterDefault: Bool(true), }, Meta: make(map[string]string), SyslogFacility: "LOCAL0", @@ -1461,6 +1472,21 @@ func DecodeConfig(r io.Reader) (*Config, error) { result.EnableACLReplication = true } + // Parse the metric filters + for _, rule := range result.Telemetry.PrefixFilter { + if rule == "" { + return nil, fmt.Errorf("Cannot have empty filter rule in prefix_filter") + } + switch rule[0] { + case '+': + result.Telemetry.AllowedPrefixes = append(result.Telemetry.AllowedPrefixes, rule[1:]) + case '-': + result.Telemetry.BlockedPrefixes = append(result.Telemetry.BlockedPrefixes, rule[1:]) + default: + return nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %s", rule) + } + } + return &result, nil } @@ -1755,6 +1781,12 @@ func MergeConfig(a, b *Config) *Config { if b.Telemetry.DisableHostname == true { result.Telemetry.DisableHostname = true } + if len(b.Telemetry.PrefixFilter) != 0 { + result.Telemetry.PrefixFilter = append(result.Telemetry.PrefixFilter, b.Telemetry.PrefixFilter...) + } + if b.Telemetry.FilterDefault != nil { + result.Telemetry.FilterDefault = b.Telemetry.FilterDefault + } if b.Telemetry.StatsdAddr != "" { result.Telemetry.StatsdAddr = b.Telemetry.StatsdAddr } diff --git a/agent/config_test.go b/agent/config_test.go index 42e4366a79..5b83c677d6 100644 --- a/agent/config_test.go +++ b/agent/config_test.go @@ -719,6 +719,18 @@ func TestDecodeConfig(t *testing.T) { in: `{"telemetry":{"dogstatsd_tags":["a","b"]}}`, c: &Config{Telemetry: Telemetry{DogStatsdTags: []string{"a", "b"}}}, }, + { + in: `{"telemetry":{"filter_default":true}}`, + c: &Config{Telemetry: Telemetry{FilterDefault: Bool(true)}}, + }, + { + in: `{"telemetry":{"prefix_filter":["+consul.metric","-consul.othermetric"]}}`, + c: &Config{Telemetry: Telemetry{ + PrefixFilter: []string{"+consul.metric", "-consul.othermetric"}, + AllowedPrefixes: []string{"consul.metric"}, + BlockedPrefixes: []string{"consul.othermetric"}, + }}, + }, { in: `{"telemetry":{"statsd_address":"a"}}`, c: &Config{Telemetry: Telemetry{StatsdAddr: "a"}}, diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index db7b6c7bfe..06ad854948 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -268,12 +268,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Provide some metrics if err == nil { - metrics.IncrCounter([]string{"consul", "catalog", "service", "query", args.ServiceName}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}}) if args.ServiceTag != "" { - metrics.IncrCounter([]string{"consul", "catalog", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query-tag"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}}) } if len(reply.ServiceNodes) == 0 { - metrics.IncrCounter([]string{"consul", "catalog", "service", "not-found", args.ServiceName}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "not-found"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}}) } } return err diff --git a/agent/consul/fsm.go b/agent/consul/fsm.go index b3b39a7f3f..9481ea35a1 100644 --- a/agent/consul/fsm.go +++ b/agent/consul/fsm.go @@ -172,7 +172,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) switch req.Op { case api.KVSet: return c.state.KVSSet(index, &req.DirEnt) @@ -216,7 +217,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) switch req.Op { case structs.SessionCreate: if err := c.state.SessionCreate(index, &req.Session); err != nil { @@ -236,7 +238,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) switch req.Op { case structs.ACLBootstrapInit: enabled, err := c.state.ACLBootstrapInit(index) @@ -267,7 +270,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{ if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) switch req.Op { case structs.TombstoneReap: return c.state.ReapTombstones(req.ReapIndex) @@ -301,7 +305,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf panic(fmt.Errorf("failed to decode request: %v", err)) } - defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) switch req.Op { case structs.PreparedQueryCreate, structs.PreparedQueryUpdate: return c.state.PreparedQuerySet(index, req.Query) diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 8d9d9ac46a..b930d3032e 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -139,12 +139,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Provide some metrics if err == nil { - metrics.IncrCounter([]string{"consul", "health", "service", "query", args.ServiceName}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}}) if args.ServiceTag != "" { - metrics.IncrCounter([]string{"consul", "health", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query-tag"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}}) } if len(reply.Nodes) == 0 { - metrics.IncrCounter([]string{"consul", "health", "service", "not-found", args.ServiceName}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "not-found"}, 1, + []metrics.Label{{Name: "service", Value: args.ServiceName}}) } } return err diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 9f63166c60..3fd544b41a 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -263,7 +263,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ return structs.ErrNoDCPath } - metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) + metrics.IncrCounterWithLabels([]string{"consul", "rpc", "cross-dc"}, 1, + []metrics.Label{{Name: "datacenter", Value: dc}}) if err := s.connPool.RPC(dc, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil { manager.NotifyFailedServer(server) s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err) diff --git a/agent/dns.go b/agent/dns.go index bf01bd1e85..808c02cca4 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -118,7 +118,8 @@ START: func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) { q := req.Question[0] defer func(s time.Time) { - metrics.MeasureSince([]string{"consul", "dns", "ptr_query", d.agent.config.NodeName}, s) + metrics.MeasureSinceWithLabels([]string{"consul", "dns", "ptr_query"}, s, + []metrics.Label{{Name: "node", Value: d.agent.config.NodeName}}) d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)", q, time.Now().Sub(s), resp.RemoteAddr().String(), resp.RemoteAddr().Network()) @@ -187,7 +188,8 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) { func (d *DNSServer) handleQuery(resp dns.ResponseWriter, req *dns.Msg) { q := req.Question[0] defer func(s time.Time) { - metrics.MeasureSince([]string{"consul", "dns", "domain_query", d.agent.config.NodeName}, s) + metrics.MeasureSinceWithLabels([]string{"consul", "dns", "domain_query"}, s, + []metrics.Label{{Name: "node", Value: d.agent.config.NodeName}}) d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)", q, time.Now().Sub(s), resp.RemoteAddr().String(), resp.RemoteAddr().Network()) diff --git a/agent/http.go b/agent/http.go index a42ce15e1b..2d43209b18 100644 --- a/agent/http.go +++ b/agent/http.go @@ -60,6 +60,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler { // Register the wrapper, which will close over the expensive-to-compute // parts from above. + // TODO (kyhavlov): Convert this to utilize metric labels in a major release wrapper := func(resp http.ResponseWriter, req *http.Request) { start := time.Now() handler(resp, req) @@ -97,6 +98,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler { handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) + handleFuncMetrics("/v1/agent/metrics", s.wrap(s.agent.MemSink.DisplayMetrics)) handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) diff --git a/api/agent.go b/api/agent.go index 383d2816ed..2f43d8eda2 100644 --- a/api/agent.go +++ b/api/agent.go @@ -96,6 +96,15 @@ type AgentToken struct { Token string } +// Metrics info is used to store different types of metric values from the agent. +type MetricsInfo struct { + Timestamp string + Gauges []map[string]interface{} + Points []map[string]interface{} + Counters []map[string]interface{} + Samples []map[string]interface{} +} + // Agent can be used to query the Agent endpoints type Agent struct { c *Client @@ -126,6 +135,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) { return out, nil } +// Metrics is used to query the agent we are speaking to for +// its current internal metric data +func (a *Agent) Metrics() (*MetricsInfo, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out *MetricsInfo + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") diff --git a/api/agent_test.go b/api/agent_test.go index 6e6292eee9..1b2cf2cba6 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -28,6 +28,28 @@ func TestAPI_AgentSelf(t *testing.T) { } } +func TestAPI_AgentMetrics(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + agent := c.Agent() + + metrics, err := agent.Metrics() + if err != nil { + t.Fatalf("err: %v", err) + } + + if len(metrics.Gauges) < 0 { + t.Fatalf("bad: %v", metrics) + } + + name := metrics.Gauges[0]["Name"] + if name != "consul.runtime.alloc_bytes" { + t.Fatalf("bad: %v", metrics.Gauges[0]) + } +} + func TestAPI_AgentReload(t *testing.T) { t.Parallel() diff --git a/command/agent.go b/command/agent.go index 66c5ef3082..fc31a53b7c 100644 --- a/command/agent.go +++ b/command/agent.go @@ -601,7 +601,7 @@ func circonusSink(config *agent.Config, hostname string) (metrics.MetricSink, er return sink, nil } -func startupTelemetry(config *agent.Config) error { +func startupTelemetry(config *agent.Config) (*metrics.InmemSink, error) { // Setup telemetry // Aggregate on 10 second intervals for 1 minute. Expose the // metrics over stderr when there is a SIGUSR1 received. @@ -609,6 +609,7 @@ func startupTelemetry(config *agent.Config) error { metrics.DefaultInmemSignal(memSink) metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix) metricsConf.EnableHostname = !config.Telemetry.DisableHostname + metricsConf.FilterDefault = *config.Telemetry.FilterDefault var sinks metrics.FanoutSink addSink := func(name string, fn func(*agent.Config, string) (metrics.MetricSink, error)) error { @@ -623,16 +624,16 @@ func startupTelemetry(config *agent.Config) error { } if err := addSink("statsite", statsiteSink); err != nil { - return err + return nil, err } if err := addSink("statsd", statsdSink); err != nil { - return err + return nil, err } if err := addSink("dogstatd", dogstatdSink); err != nil { - return err + return nil, err } if err := addSink("circonus", circonusSink); err != nil { - return err + return nil, err } if len(sinks) > 0 { @@ -642,7 +643,7 @@ func startupTelemetry(config *agent.Config) error { metricsConf.EnableHostname = false metrics.NewGlobal(metricsConf, memSink) } - return nil + return memSink, nil } func (cmd *AgentCommand) Run(args []string) int { @@ -682,7 +683,8 @@ func (cmd *AgentCommand) run(args []string) int { cmd.logOutput = logOutput cmd.logger = log.New(logOutput, "", log.LstdFlags) - if err := startupTelemetry(config); err != nil { + memSink, err := startupTelemetry(config) + if err != nil { cmd.UI.Error(err.Error()) return 1 } @@ -696,6 +698,7 @@ func (cmd *AgentCommand) run(args []string) int { } agent.LogOutput = logOutput agent.LogWriter = logWriter + agent.MemSink = memSink if err := agent.Start(); err != nil { cmd.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) diff --git a/website/source/api/agent.html.md b/website/source/api/agent.html.md index 5adee9766e..78876c84fa 100644 --- a/website/source/api/agent.html.md +++ b/website/source/api/agent.html.md @@ -249,6 +249,112 @@ $ curl \ https://consul.rocks/v1/agent/maintenance?enable=true&reason=For+API+docs ``` +## View Metrics + +This endpoint returns the configuration and member information of the local +agent. + +| Method | Path | Produces | +| ------ | ---------------------------- | -------------------------- | +| `GET` | `/agent/metrics` | `application/json` | + +This endpoint will dump the metrics for the most recent finished interval. +For more information about metrics, see the [telemetry](/docs/agent/telemetry.html) +page. + +| Blocking Queries | Consistency Modes | ACL Required | +| ---------------- | ----------------- | ------------ | +| `NO` | `none` | `agent:read` | + +### Sample Request + +```text +$ curl \ + https://consul.rocks/v1/agent/metrics +``` + +### Sample Response + +```json +{ + "Timestamp": "2017-08-08 02:55:10 +0000 UTC", + "Gauges": [ + { + "Name": "consul.consul.session_ttl.active", + "Value": 0, + "Labels": {} + }, + { + "Name": "consul.runtime.alloc_bytes", + "Value": 4704344, + "Labels": {} + }, + { + "Name": "consul.runtime.free_count", + "Value": 74063, + "Labels": {} + } + ], + "Points": [], + "Counters": [ + { + "Name": "consul.consul.catalog.service.query", + "Count": 1, + "Sum": 1, + "Min": 1, + "Max": 1, + "Mean": 1, + "Stddev": 0, + "Labels": { + "service": "consul" + } + }, + { + "Name": "consul.raft.apply", + "Count": 1, + "Sum": 1, + "Min": 1, + "Max": 1, + "Mean": 1, + "Stddev": 0, + "Labels": {} + } + ], + "Samples": [ + { + "Name": "consul.consul.http.GET.v1.agent.metrics", + "Count": 1, + "Sum": 0.1817069947719574, + "Min": 0.1817069947719574, + "Max": 0.1817069947719574, + "Mean": 0.1817069947719574, + "Stddev": 0, + "Labels": {} + }, + { + "Name": "consul.consul.http.GET.v1.catalog.service._", + "Count": 1, + "Sum": 0.23342099785804749, + "Min": 0.23342099785804749, + "Max": 0.23342099785804749, + "Mean": 0.23342099785804749, + "Stddev": 0, + "Labels": {} + }, + { + "Name": "consul.serf.queue.Query", + "Count": 20, + "Sum": 0, + "Min": 0, + "Max": 0, + "Mean": 0, + "Stddev": 0, + "Labels": {} + } + ] +} +``` + ## Stream Logs This endpoint streams logs from the local agent until the connection is closed. diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index 4dfb2616c4..fde27bdbf0 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -1130,6 +1130,24 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `disable_hostname` This controls whether or not to prepend runtime telemetry with the machine's hostname, defaults to false. + * `prefix_filter` + This is a list of filter rules to apply for allowing/blocking metrics by prefix in the following format: + + ```javascript + [ + "+consul.raft.apply", + "-consul.http", + "+consul.http.GET" + ] + ``` + A leading "+" will enable any metrics with the given prefix, and a leading "-" will block them. If there + is overlap between two rules, the more specific rule will take precedence. Blocking will take priority if the same + prefix is listed multiple times. + + * `filter_default` + This controls whether to allow metrics that have not been specified by the filter. Defaults to `true`, which will + allow all metrics when no filters are provided. When set to `false` with no filters, no metrics will be sent. + * `circonus_api_token` A valid API Token used to create/manage check. If provided, metric management is enabled.