2021-10-08 17:31:50 +00:00
package agent
import (
2022-06-03 17:07:37 +00:00
"context"
2021-10-27 19:23:29 +00:00
"crypto/x509"
"fmt"
2021-10-08 17:31:50 +00:00
"net/http"
"net/http/httptest"
2022-11-10 16:26:01 +00:00
"os"
2021-10-27 19:23:29 +00:00
"path/filepath"
2021-10-08 17:31:50 +00:00
"strings"
"testing"
2022-06-03 17:07:37 +00:00
"time"
2021-10-27 19:23:29 +00:00
2022-06-29 16:46:37 +00:00
"github.com/stretchr/testify/require"
2022-03-31 17:49:37 +00:00
"github.com/hashicorp/consul/agent/rpc/middleware"
2022-06-03 17:07:37 +00:00
"github.com/hashicorp/consul/lib/retry"
2021-10-27 19:23:29 +00:00
"github.com/hashicorp/consul/sdk/testutil"
2021-10-27 19:56:38 +00:00
"github.com/hashicorp/consul/testrpc"
2021-10-27 19:23:29 +00:00
"github.com/hashicorp/consul/tlsutil"
2021-10-08 17:31:50 +00:00
)
2021-10-27 19:23:29 +00:00
func skipIfShortTesting ( t * testing . T ) {
2021-10-25 17:55:59 +00:00
if testing . Short ( ) {
t . Skip ( "too slow for testing.Short" )
}
}
func recordPromMetrics ( t * testing . T , a * TestAgent , respRec * httptest . ResponseRecorder ) {
2021-10-27 19:23:29 +00:00
t . Helper ( )
2021-10-25 17:55:59 +00:00
req , err := http . NewRequest ( "GET" , "/v1/agent/metrics?format=prometheus" , nil )
require . NoError ( t , err , "Failed to generate new http request." )
_ , err = a . srv . AgentMetrics ( respRec , req )
require . NoError ( t , err , "Failed to serve agent metrics" )
}
2022-03-17 23:02:26 +00:00
func assertMetricExists ( t * testing . T , respRec * httptest . ResponseRecorder , metric string ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
if ! strings . Contains ( respRec . Body . String ( ) , metric ) {
t . Fatalf ( "Could not find the metric \"%s\" in the /v1/agent/metrics response" , metric )
}
}
2022-09-15 17:04:37 +00:00
// assertMetricExistsWithLabels looks in the prometheus metrics response for the metric name and all the labels. eg:
2022-03-31 17:49:37 +00:00
// new_rpc_metrics_rpc_server_call{errored="false",method="Status.Ping",request_type="unknown",rpc_type="net/rpc"}
func assertMetricExistsWithLabels ( t * testing . T , respRec * httptest . ResponseRecorder , metric string , labelNames [ ] string ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
if ! strings . Contains ( respRec . Body . String ( ) , metric ) {
t . Fatalf ( "Could not find the metric \"%s\" in the /v1/agent/metrics response" , metric )
}
foundAllLabels := false
metrics := respRec . Body . String ( )
for _ , line := range strings . Split ( metrics , "\n" ) {
// skip help lines
if len ( line ) < 1 || line [ 0 ] == '#' {
continue
}
if strings . Contains ( line , metric ) {
hasAllLabels := true
for _ , labelName := range labelNames {
if ! strings . Contains ( line , labelName ) {
hasAllLabels = false
break
}
}
if hasAllLabels {
foundAllLabels = true
// done!
break
}
}
}
if ! foundAllLabels {
t . Fatalf ( "Could not verify that all named labels \"%s\" exist for the metric \"%s\" in the /v1/agent/metrics response" , strings . Join ( labelNames , ", " ) , metric )
}
}
func assertLabelWithValueForMetricExistsNTime ( t * testing . T , respRec * httptest . ResponseRecorder , metric string , label string , labelValue string , occurrences int ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
if ! strings . Contains ( respRec . Body . String ( ) , metric ) {
t . Fatalf ( "Could not find the metric \"%s\" in the /v1/agent/metrics response" , metric )
}
metrics := respRec . Body . String ( )
// don't look at _sum or _count or other aggregates
metricTarget := metric + "{"
// eg method="Status.Ping"
labelWithValueTarget := label + "=" + "\"" + labelValue + "\""
matchesFound := 0
for _ , line := range strings . Split ( metrics , "\n" ) {
// skip help lines
if len ( line ) < 1 || line [ 0 ] == '#' {
continue
}
if strings . Contains ( line , metricTarget ) {
if strings . Contains ( line , labelWithValueTarget ) {
matchesFound ++
}
}
}
if matchesFound < occurrences {
t . Fatalf ( "Only found metric \"%s\" %d times. Wanted %d times." , metric , matchesFound , occurrences )
}
}
2021-10-25 17:55:59 +00:00
func assertMetricExistsWithValue ( t * testing . T , respRec * httptest . ResponseRecorder , metric string , value string ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
// eg "consul_autopilot_healthy NaN"
target := metric + " " + value
if ! strings . Contains ( respRec . Body . String ( ) , target ) {
t . Fatalf ( "Could not find the metric \"%s\" with value \"%s\" in the /v1/agent/metrics response" , metric , value )
}
}
2022-09-15 17:04:37 +00:00
func assertMetricsWithLabelIsNonZero ( t * testing . T , respRec * httptest . ResponseRecorder , label , labelValue string ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
metrics := respRec . Body . String ( )
labelWithValueTarget := label + "=" + "\"" + labelValue + "\""
for _ , line := range strings . Split ( metrics , "\n" ) {
if len ( line ) < 1 || line [ 0 ] == '#' {
continue
}
if strings . Contains ( line , labelWithValueTarget ) {
s := strings . SplitN ( line , " " , 2 )
if s [ 1 ] == "0" {
t . Fatalf ( "Metric with label provided \"%s:%s\" has the value 0" , label , labelValue )
}
}
}
}
2021-10-25 17:55:59 +00:00
func assertMetricNotExists ( t * testing . T , respRec * httptest . ResponseRecorder , metric string ) {
if respRec . Body . String ( ) == "" {
t . Fatalf ( "Response body is empty." )
}
if strings . Contains ( respRec . Body . String ( ) , metric ) {
t . Fatalf ( "Didn't expect to find the metric \"%s\" in the /v1/agent/metrics response" , metric )
}
}
2022-03-31 17:49:37 +00:00
// TestAgent_OneTwelveRPCMetrics test for the 1.12 style RPC metrics. These are the labeled metrics coming from
2022-03-17 23:02:26 +00:00
// agent.rpc.middleware.interceptors package.
2022-03-31 17:49:37 +00:00
func TestAgent_OneTwelveRPCMetrics ( t * testing . T ) {
2022-03-17 23:02:26 +00:00
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
2022-03-31 17:49:37 +00:00
t . Run ( "Check that 1.12 rpc metrics are not emitted by default." , func ( t * testing . T ) {
2022-03-17 23:02:26 +00:00
metricsPrefix := "new_rpc_metrics"
hcl := fmt . Sprintf ( `
telemetry = {
prometheus_retention_time = "5s"
disable_hostname = true
metrics_prefix = "%s"
}
` , metricsPrefix )
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
var out struct { }
2022-12-14 15:24:22 +00:00
err := a . RPC ( context . Background ( ) , "Status.Ping" , struct { } { } , & out )
2022-03-17 23:02:26 +00:00
require . NoError ( t , err )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
2022-03-31 17:49:37 +00:00
assertMetricNotExists ( t , respRec , metricsPrefix + "_rpc_server_call" )
} )
t . Run ( "Check that 1.12 rpc metrics are emitted when specified by operator." , func ( t * testing . T ) {
metricsPrefix := "new_rpc_metrics_2"
allowRPCMetricRule := metricsPrefix + "." + strings . Join ( middleware . OneTwelveRPCSummary [ 0 ] . Name , "." )
hcl := fmt . Sprintf ( `
telemetry = {
prometheus_retention_time = "5s"
disable_hostname = true
metrics_prefix = "%s"
prefix_filter = [ "+%s" ]
}
` , metricsPrefix , allowRPCMetricRule )
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
var out struct { }
2022-12-14 15:24:22 +00:00
err := a . RPC ( context . Background ( ) , "Status.Ping" , struct { } { } , & out )
2022-03-31 17:49:37 +00:00
require . NoError ( t , err )
2022-12-14 15:24:22 +00:00
err = a . RPC ( context . Background ( ) , "Status.Ping" , struct { } { } , & out )
2022-03-31 17:49:37 +00:00
require . NoError ( t , err )
2022-12-14 15:24:22 +00:00
err = a . RPC ( context . Background ( ) , "Status.Ping" , struct { } { } , & out )
2022-03-31 17:49:37 +00:00
require . NoError ( t , err )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
// make sure the labels exist for this metric
2022-04-12 17:50:25 +00:00
assertMetricExistsWithLabels ( t , respRec , metricsPrefix + "_rpc_server_call" , [ ] string { "errored" , "method" , "request_type" , "rpc_type" , "leader" } )
2022-03-31 17:49:37 +00:00
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
assertLabelWithValueForMetricExistsNTime ( t , respRec , metricsPrefix + "_rpc_server_call" , "method" , "Status.Ping" , 3 )
2022-09-15 17:04:37 +00:00
// make sure rpc calls with elapsed time below 1ms are reported as decimal
assertMetricsWithLabelIsNonZero ( t , respRec , "method" , "Status.Ping" )
2022-03-17 23:02:26 +00:00
} )
}
2022-06-03 17:07:37 +00:00
func TestHTTPHandlers_AgentMetrics_LeaderShipMetrics ( t * testing . T ) {
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
t . Run ( "check that metric isLeader is set properly on server" , func ( t * testing . T ) {
hcl := `
telemetry = {
prometheus_retention_time = "5s" ,
metrics_prefix = "agent_is_leader"
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
retryWithBackoff := func ( expectedStr string ) error {
waiter := & retry . Waiter {
MaxWait : 1 * time . Minute ,
}
ctx := context . Background ( )
for {
if waiter . Failures ( ) > 7 {
return fmt . Errorf ( "reach max failure: %d" , waiter . Failures ( ) )
}
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
out := respRec . Body . String ( )
if strings . Contains ( out , expectedStr ) {
return nil
}
waiter . Wait ( ctx )
}
}
// agent hasn't become a leader
err := retryWithBackoff ( "isLeader 0" )
require . NoError ( t , err , "non-leader server should have isLeader 0" )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
// Verify agent's isLeader metrics is 1
err = retryWithBackoff ( "isLeader 1" )
require . NoError ( t , err , "leader should have isLeader 1" )
} )
}
2021-10-25 17:55:59 +00:00
// TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus adds testing around
2021-10-08 17:31:50 +00:00
// the published autopilot metrics on https://www.consul.io/docs/agent/telemetry#autopilot
2021-10-25 17:55:59 +00:00
func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus ( t * testing . T ) {
2021-10-27 19:23:29 +00:00
skipIfShortTesting ( t )
2021-10-08 17:31:50 +00:00
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
2021-10-25 17:55:59 +00:00
t . Run ( "Check consul_autopilot_* are not emitted metrics on clients" , func ( t * testing . T ) {
hcl := `
telemetry = {
prometheus_retention_time = "5s"
disable_hostname = true
metrics_prefix = "agent_1"
}
bootstrap = false
server = false
2021-10-08 17:31:50 +00:00
`
2021-10-25 17:55:59 +00:00
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
2021-10-08 17:31:50 +00:00
2021-10-25 17:55:59 +00:00
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
2021-10-08 17:31:50 +00:00
2021-10-25 17:55:59 +00:00
assertMetricNotExists ( t , respRec , "agent_1_autopilot_healthy" )
assertMetricNotExists ( t , respRec , "agent_1_autopilot_failure_tolerance" )
} )
2021-10-08 17:31:50 +00:00
t . Run ( "Check consul_autopilot_healthy metric value on startup" , func ( t * testing . T ) {
2021-10-25 17:55:59 +00:00
// don't bootstrap agent so as not to
// become a leader
hcl := `
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_2"
2021-10-08 17:31:50 +00:00
}
2021-10-25 17:55:59 +00:00
bootstrap = false
`
2021-10-08 17:31:50 +00:00
2021-10-25 17:55:59 +00:00
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
2022-04-07 14:48:48 +00:00
assertMetricExistsWithValue ( t , respRec , "agent_2_autopilot_healthy" , "1" )
assertMetricExistsWithValue ( t , respRec , "agent_2_autopilot_failure_tolerance" , "0" )
2021-10-25 17:55:59 +00:00
} )
2021-10-08 17:31:50 +00:00
}
2021-10-27 19:23:29 +00:00
func TestHTTPHandlers_AgentMetrics_TLSCertExpiry_Prometheus ( t * testing . T ) {
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
dir := testutil . TempDir ( t , "ca" )
caPEM , caPK , err := tlsutil . GenerateCA ( tlsutil . CAOpts { Days : 20 , Domain : "consul" } )
require . NoError ( t , err )
caPath := filepath . Join ( dir , "ca.pem" )
2022-11-10 16:26:01 +00:00
err = os . WriteFile ( caPath , [ ] byte ( caPEM ) , 0600 )
2021-10-27 19:23:29 +00:00
require . NoError ( t , err )
signer , err := tlsutil . ParseSigner ( caPK )
require . NoError ( t , err )
pem , key , err := tlsutil . GenerateCert ( tlsutil . CertOpts {
Signer : signer ,
CA : caPEM ,
Name : "server.dc1.consul" ,
Days : 20 ,
ExtKeyUsage : [ ] x509 . ExtKeyUsage { x509 . ExtKeyUsageClientAuth } ,
} )
require . NoError ( t , err )
certPath := filepath . Join ( dir , "cert.pem" )
2022-11-10 16:26:01 +00:00
err = os . WriteFile ( certPath , [ ] byte ( pem ) , 0600 )
2021-10-27 19:23:29 +00:00
require . NoError ( t , err )
keyPath := filepath . Join ( dir , "cert.key" )
2022-11-10 16:26:01 +00:00
err = os . WriteFile ( keyPath , [ ] byte ( key ) , 0600 )
2021-10-27 19:23:29 +00:00
require . NoError ( t , err )
hcl := fmt . Sprintf ( `
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_3"
}
ca_file = "%s"
cert_file = "%s"
key_file = "%s"
` , caPath , certPath , keyPath )
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . Contains ( t , respRec . Body . String ( ) , "agent_3_agent_tls_cert_expiry 1.7" )
}
2021-10-27 19:56:38 +00:00
func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus ( t * testing . T ) {
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
t . Run ( "non-leader emits NaN" , func ( t * testing . T ) {
hcl := `
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_4"
}
connect {
enabled = true
}
bootstrap = false
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . Contains ( t , respRec . Body . String ( ) , "agent_4_mesh_active_root_ca_expiry NaN" )
require . Contains ( t , respRec . Body . String ( ) , "agent_4_mesh_active_signing_ca_expiry NaN" )
} )
t . Run ( "leader emits a value" , func ( t * testing . T ) {
hcl := `
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_5"
}
connect {
enabled = true
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
out := respRec . Body . String ( )
require . Contains ( t , out , "agent_5_mesh_active_root_ca_expiry 3.15" )
require . Contains ( t , out , "agent_5_mesh_active_signing_ca_expiry 3.15" )
} )
}
2023-02-23 16:15:21 +00:00
func TestHTTPHandlers_AgentMetrics_WAL_Prometheus ( t * testing . T ) {
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
t . Run ( "client agent emits nothing" , func ( t * testing . T ) {
hcl := `
server = false
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_4"
}
raft_logstore {
backend = "wal"
}
bootstrap = false
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . NotContains ( t , respRec . Body . String ( ) , "agent_4_raft_wal" )
} )
t . Run ( "server with WAL enabled emits WAL metrics" , func ( t * testing . T ) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_5"
}
connect {
enabled = true
}
raft_logstore {
backend = "wal"
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
out := respRec . Body . String ( )
require . Contains ( t , out , "agent_5_raft_wal_head_truncations" )
require . Contains ( t , out , "agent_5_raft_wal_last_segment_age_seconds" )
require . Contains ( t , out , "agent_5_raft_wal_log_appends" )
require . Contains ( t , out , "agent_5_raft_wal_log_entries_read" )
require . Contains ( t , out , "agent_5_raft_wal_log_entries_written" )
require . Contains ( t , out , "agent_5_raft_wal_log_entry_bytes_read" )
require . Contains ( t , out , "agent_5_raft_wal_log_entry_bytes_written" )
require . Contains ( t , out , "agent_5_raft_wal_segment_rotations" )
require . Contains ( t , out , "agent_5_raft_wal_stable_gets" )
require . Contains ( t , out , "agent_5_raft_wal_stable_sets" )
require . Contains ( t , out , "agent_5_raft_wal_tail_truncations" )
} )
t . Run ( "server without WAL enabled emits no WAL metrics" , func ( t * testing . T ) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_6"
}
connect {
enabled = true
}
raft_logstore {
backend = "boltdb"
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . NotContains ( t , respRec . Body . String ( ) , "agent_6_raft_wal" )
} )
}
func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus ( t * testing . T ) {
skipIfShortTesting ( t )
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
t . Run ( "client agent emits nothing" , func ( t * testing . T ) {
hcl := `
server = false
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_4"
}
raft_logstore {
verification {
enabled = true
interval = "1s"
}
}
bootstrap = false
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . NotContains ( t , respRec . Body . String ( ) , "agent_4_raft_logstore_verifier" )
} )
t . Run ( "server with verifier enabled emits all metrics" , func ( t * testing . T ) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_5"
}
connect {
enabled = true
}
raft_logstore {
verification {
enabled = true
interval = "1s"
}
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
out := respRec . Body . String ( )
require . Contains ( t , out , "agent_5_raft_logstore_verifier_checkpoints_written" )
require . Contains ( t , out , "agent_5_raft_logstore_verifier_dropped_reports" )
require . Contains ( t , out , "agent_5_raft_logstore_verifier_ranges_verified" )
require . Contains ( t , out , "agent_5_raft_logstore_verifier_read_checksum_failures" )
require . Contains ( t , out , "agent_5_raft_logstore_verifier_write_checksum_failures" )
} )
t . Run ( "server with verifier disabled emits no extra metrics" , func ( t * testing . T ) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s" ,
disable_hostname = true
metrics_prefix = "agent_6"
}
connect {
enabled = true
}
raft_logstore {
verification {
enabled = false
}
}
`
a := StartTestAgent ( t , TestAgent { HCL : hcl } )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
respRec := httptest . NewRecorder ( )
recordPromMetrics ( t , a , respRec )
require . NotContains ( t , respRec . Body . String ( ) , "agent_6_raft_logstore_verifier" )
} )
}