diff --git a/.changelog/17460.txt b/.changelog/17460.txt new file mode 100644 index 0000000000..8e9c55517f --- /dev/null +++ b/.changelog/17460.txt @@ -0,0 +1,3 @@ +```release-note:feature +hcp: Add new metrics sink to collect, aggregate and export server metrics to HCP in OTEL format. +``` \ No newline at end of file diff --git a/agent/consul/server.go b/agent/consul/server.go index 6c6afc0154..c7d6d19c0e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -60,6 +60,7 @@ import ( agentgrpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/services/subscribe" "github.com/hashicorp/consul/agent/hcp" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" logdrop "github.com/hashicorp/consul/agent/log-drop" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -2027,7 +2028,7 @@ func (s *Server) trackLeaderChanges() { // hcpServerStatus is the callback used by the HCP manager to emit status updates to the HashiCorp Cloud Platform when // enabled. func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { - return func(ctx context.Context) (status hcp.ServerStatus, err error) { + return func(ctx context.Context) (status hcpclient.ServerStatus, err error) { status.Name = s.config.NodeName status.ID = string(s.config.NodeID) status.Version = cslversion.GetHumanVersion() diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index bd39e9676e..ebff789b14 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -27,8 +27,6 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" - "github.com/hashicorp/consul/agent/hcp" - "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/connect" @@ -36,6 +34,7 @@ import ( rpcRate "github.com/hashicorp/consul/agent/consul/rate" external "github.com/hashicorp/consul/agent/grpc-external" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" @@ -2082,10 +2081,10 @@ func TestServer_hcpManager(t *testing.T) { _, conf1 := testServerConfig(t) conf1.BootstrapExpect = 1 conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port} - hcp1 := hcp.NewMockClient(t) - hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcp.ServerStatus) bool { + hcp1 := hcpclient.NewMockClient(t) + hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcpclient.ServerStatus) bool { return status.ID == string(conf1.NodeID) - })).Run(func(ctx context.Context, status *hcp.ServerStatus) { + })).Run(func(ctx context.Context, status *hcpclient.ServerStatus) { require.Equal(t, status.LanAddress, "127.0.0.2") }).Call.Return(nil) diff --git a/agent/hcp/bootstrap/bootstrap.go b/agent/hcp/bootstrap/bootstrap.go index e6a1ae120a..38fa7d2274 100644 --- a/agent/hcp/bootstrap/bootstrap.go +++ b/agent/hcp/bootstrap/bootstrap.go @@ -23,7 +23,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" - "github.com/hashicorp/consul/agent/hcp" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-uuid" @@ -65,7 +65,7 @@ type RawBootstrapConfig struct { // fetch from HCP servers if the local data is incomplete. // It must be passed a (CLI) UI implementation so it can deliver progress // updates to the user, for example if it is waiting to retry for a long period. -func LoadConfig(ctx context.Context, client hcp.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) { +func LoadConfig(ctx context.Context, client hcpclient.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) { ui.Output("Loading configuration from HCP") // See if we have existing config on disk @@ -181,14 +181,14 @@ func finalizeRuntimeConfig(rc *config.RuntimeConfig, cfg *RawBootstrapConfig) { // fetchBootstrapConfig will fetch boostrap configuration from remote servers and persist it to disk. // It will retry until successful or a terminal error condition is found (e.g. permission denied). -func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) { +func fetchBootstrapConfig(ctx context.Context, client hcpclient.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) { w := retry.Waiter{ MinWait: 1 * time.Second, MaxWait: 5 * time.Minute, Jitter: retry.NewJitter(50), } - var bsCfg *hcp.BootstrapConfig + var bsCfg *hcpclient.BootstrapConfig for { // Note we don't want to shadow `ctx` here since we need that for the Wait // below. @@ -225,7 +225,7 @@ func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string // persistAndProcessConfig is called when we receive data from CCM. // We validate and persist everything that was received, then also update // the JSON config as needed. -func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcp.BootstrapConfig) (string, error) { +func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcpclient.BootstrapConfig) (string, error) { if devMode { // Agent in dev mode, we still need somewhere to persist the certs // temporarily though to be able to start up at all since we don't support diff --git a/agent/hcp/bootstrap/bootstrap_test.go b/agent/hcp/bootstrap/bootstrap_test.go index 5d00d3372d..7cb46a32bf 100644 --- a/agent/hcp/bootstrap/bootstrap_test.go +++ b/agent/hcp/bootstrap/bootstrap_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/hcp" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/go-uuid" @@ -157,7 +158,7 @@ func TestLoadConfig_Persistence(t *testing.T) { // Override the client TLS config so that the test server can be trusted. initial.RuntimeConfig.Cloud.WithTLSConfig(clientTLS) - client, err := hcp.NewClient(initial.RuntimeConfig.Cloud) + client, err := hcpclient.NewClient(initial.RuntimeConfig.Cloud) require.NoError(t, err) loader, err := LoadConfig(context.Background(), client, initial.RuntimeConfig.DataDir, baseLoader, ui) diff --git a/agent/hcp/client.go b/agent/hcp/client/client.go similarity index 71% rename from agent/hcp/client.go rename to agent/hcp/client/client.go index bd6679a331..4203104500 100644 --- a/agent/hcp/client.go +++ b/agent/hcp/client/client.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package hcp +package client import ( "context" @@ -11,6 +11,8 @@ import ( httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" + + hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" "github.com/hashicorp/hcp-sdk-go/httpclient" @@ -20,15 +22,34 @@ import ( "github.com/hashicorp/consul/version" ) +// metricsGatewayPath is the default path for metrics export request on the Telemetry Gateway. +const metricsGatewayPath = "/v1/metrics" + // Client interface exposes HCP operations that can be invoked by Consul // //go:generate mockery --name Client --with-expecter --inpackage type Client interface { FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) + FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) PushServerStatus(ctx context.Context, status *ServerStatus) error DiscoverServers(ctx context.Context) ([]string, error) } +// MetricsConfig holds metrics specific configuration for the TelemetryConfig. +// The endpoint field overrides the TelemetryConfig endpoint. +type MetricsConfig struct { + Filters []string + Endpoint string +} + +// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers +// to the HCP Telemetry gateway. +type TelemetryConfig struct { + Endpoint string + Labels map[string]string + MetricsConfig *MetricsConfig +} + type BootstrapConfig struct { Name string BootstrapExpect int @@ -44,6 +65,7 @@ type hcpClient struct { hc *httptransport.Runtime cfg config.CloudConfig gnm hcpgnm.ClientService + tgw hcptelemetry.ClientService resource resource.Resource } @@ -64,6 +86,8 @@ func NewClient(cfg config.CloudConfig) (Client, error) { } client.gnm = hcpgnm.New(client.hc, nil) + client.tgw = hcptelemetry.New(client.hc, nil) + return client, nil } @@ -79,6 +103,29 @@ func httpClient(c config.CloudConfig) (*httptransport.Runtime, error) { }) } +// FetchTelemetryConfig obtains telemetry configuration from the Telemetry Gateway. +func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) { + params := hcptelemetry.NewAgentTelemetryConfigParamsWithContext(ctx). + WithLocationOrganizationID(c.resource.Organization). + WithLocationProjectID(c.resource.Project). + WithClusterID(c.resource.ID) + + resp, err := c.tgw.AgentTelemetryConfig(params, nil) + if err != nil { + return nil, err + } + + payloadConfig := resp.Payload.TelemetryConfig + return &TelemetryConfig{ + Endpoint: payloadConfig.Endpoint, + Labels: payloadConfig.Labels, + MetricsConfig: &MetricsConfig{ + Filters: payloadConfig.Metrics.IncludeList, + Endpoint: payloadConfig.Metrics.Endpoint, + }, + }, nil +} + func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) { version := version.GetHumanVersion() params := hcpgnm.NewAgentBootstrapConfigParamsWithContext(ctx). @@ -233,3 +280,32 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) { return servers, nil } + +// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved. +// It returns full metrics endpoint and true if a valid endpoint was obtained. +func (t *TelemetryConfig) Enabled() (string, bool) { + endpoint := t.Endpoint + if override := t.MetricsConfig.Endpoint; override != "" { + endpoint = override + } + + if endpoint == "" { + return "", false + } + + // The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added. + return endpoint + metricsGatewayPath, true +} + +// DefaultLabels returns a set of string pairs that must be added as attributes to all exported telemetry data. +func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string { + labels := map[string]string{ + "node_id": nodeID, // used to delineate Consul nodes in graphs + } + + for k, v := range t.Labels { + labels[k] = v + } + + return labels +} diff --git a/agent/hcp/client/client_test.go b/agent/hcp/client/client_test.go new file mode 100644 index 0000000000..43ecf0fd5c --- /dev/null +++ b/agent/hcp/client/client_test.go @@ -0,0 +1,75 @@ +package client + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestFetchTelemetryConfig(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + metricsEndpoint string + expect func(*MockClient) + disabled bool + }{ + "success": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + metricsEndpoint: "https://test.com/v1/metrics", + }, + "overrideMetricsEndpoint": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &MetricsConfig{ + Endpoint: "https://test.com", + }, + }, nil) + }, + metricsEndpoint: "https://test.com/v1/metrics", + }, + "disabledWithEmptyEndpoint": { + expect: func(mockClient *MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ + Endpoint: "", + MetricsConfig: &MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + disabled: true, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + mock := NewMockClient(t) + test.expect(mock) + + telemetryCfg, err := mock.FetchTelemetryConfig(context.Background()) + require.NoError(t, err) + + if test.disabled { + endpoint, ok := telemetryCfg.Enabled() + require.False(t, ok) + require.Empty(t, endpoint) + return + } + + endpoint, ok := telemetryCfg.Enabled() + + require.True(t, ok) + require.Equal(t, test.metricsEndpoint, endpoint) + }) + } +} diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go new file mode 100644 index 0000000000..7e19c9857a --- /dev/null +++ b/agent/hcp/client/metrics_client.go @@ -0,0 +1,157 @@ +package client + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-retryablehttp" + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/hashicorp/hcp-sdk-go/resource" + colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + "golang.org/x/oauth2" + "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/version" +) + +const ( + // HTTP Client config + defaultStreamTimeout = 15 * time.Second + + // Retry config + // TODO: Eventually, we'd like to configure these values dynamically. + defaultRetryWaitMin = 1 * time.Second + defaultRetryWaitMax = 15 * time.Second + // defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible. + // This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now. + defaultRetryMax = 0 +) + +// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway. +type MetricsClient interface { + ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error +} + +// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. +type CloudConfig interface { + HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) + Resource() (resource.Resource, error) +} + +// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. +// It also holds default HTTP headers to add to export requests. +type otlpClient struct { + client *retryablehttp.Client + header *http.Header +} + +// NewMetricsClient returns a configured MetricsClient. +// The current implementation uses otlpClient to provide retry functionality. +func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) { + if cfg == nil { + return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") + } + + if ctx == nil { + return nil, fmt.Errorf("failed to init telemetry client: provide a valid context") + } + + logger := hclog.FromContext(ctx) + + c, err := newHTTPClient(cfg, logger) + if err != nil { + return nil, fmt.Errorf("failed to init telemetry client: %v", err) + } + + r, err := cfg.Resource() + if err != nil { + return nil, fmt.Errorf("failed to init telemetry client: %v", err) + } + + header := make(http.Header) + header.Set("content-type", "application/x-protobuf") + header.Set("x-hcp-resource-id", r.String()) + header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) + + return &otlpClient{ + client: c, + header: &header, + }, nil +} + +// newHTTPClient configures the retryable HTTP client. +func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { + hcpCfg, err := cloudCfg.HCPConfig() + if err != nil { + return nil, err + } + + tlsTransport := cleanhttp.DefaultPooledTransport() + tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig() + + var transport http.RoundTripper = &oauth2.Transport{ + Base: tlsTransport, + Source: hcpCfg, + } + + client := &http.Client{ + Transport: transport, + Timeout: defaultStreamTimeout, + } + + retryClient := &retryablehttp.Client{ + HTTPClient: client, + Logger: logger.Named("hcp_telemetry_client"), + RetryWaitMin: defaultRetryWaitMin, + RetryWaitMax: defaultRetryWaitMax, + RetryMax: defaultRetryMax, + CheckRetry: retryablehttp.DefaultRetryPolicy, + Backoff: retryablehttp.DefaultBackoff, + } + + return retryClient, nil +} + +// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint. +// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config. +// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request. +func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { + pbRequest := &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, + } + + body, err := proto.Marshal(pbRequest) + if err != nil { + return fmt.Errorf("failed to marshal the request: %w", err) + } + + req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header = *o.header + + resp, err := o.client.Do(req.WithContext(ctx)) + if err != nil { + return fmt.Errorf("failed to post metrics: %w", err) + } + defer resp.Body.Close() + + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return fmt.Errorf("failed to read body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body)) + } + + return nil +} diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go new file mode 100644 index 0000000000..e80996fcf5 --- /dev/null +++ b/agent/hcp/client/metrics_client_test.go @@ -0,0 +1,114 @@ +package client + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/version" +) + +func TestNewMetricsClient(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + cfg CloudConfig + ctx context.Context + }{ + "success": { + cfg: &MockCloudCfg{}, + ctx: context.Background(), + }, + "failsWithoutCloudCfg": { + wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)", + cfg: nil, + ctx: context.Background(), + }, + "failsWithoutContext": { + wantErr: "failed to init telemetry client: provide a valid context", + cfg: MockCloudCfg{}, + ctx: nil, + }, + "failsHCPConfig": { + wantErr: "failed to init telemetry client", + cfg: MockCloudCfg{ + ConfigErr: fmt.Errorf("test bad hcp config"), + }, + ctx: context.Background(), + }, + "failsBadResource": { + wantErr: "failed to init telemetry client", + cfg: MockCloudCfg{ + ResourceErr: fmt.Errorf("test bad resource"), + }, + ctx: context.Background(), + }, + } { + t.Run(name, func(t *testing.T) { + client, err := NewMetricsClient(test.cfg, test.ctx) + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.Nil(t, err) + require.NotNil(t, client) + }) + } +} + +func TestExportMetrics(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + status int + }{ + "success": { + status: http.StatusOK, + }, + "failsWithNonRetryableError": { + status: http.StatusBadRequest, + wantErr: "failed to export metrics: code 400", + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf") + require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) + require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion())) + require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") + + body := colpb.ExportMetricsServiceResponse{} + bytes, err := proto.Marshal(&body) + + require.NoError(t, err) + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(test.status) + w.Write(bytes) + })) + defer srv.Close() + + client, err := NewMetricsClient(MockCloudCfg{}, context.Background()) + require.NoError(t, err) + + ctx := context.Background() + metrics := &metricpb.ResourceMetrics{} + err = client.ExportMetrics(ctx, metrics, srv.URL) + + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/agent/hcp/mock_Client.go b/agent/hcp/client/mock_Client.go similarity index 65% rename from agent/hcp/mock_Client.go rename to agent/hcp/client/mock_Client.go index 29bd27cbf1..06853ceb86 100644 --- a/agent/hcp/mock_Client.go +++ b/agent/hcp/client/mock_Client.go @@ -1,6 +1,6 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.22.1. DO NOT EDIT. -package hcp +package client import ( context "context" @@ -26,6 +26,10 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) { ret := _m.Called(ctx) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []string); ok { r0 = rf(ctx) } else { @@ -34,7 +38,6 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) { } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -67,11 +70,20 @@ func (_c *MockClient_DiscoverServers_Call) Return(_a0 []string, _a1 error) *Mock return _c } +func (_c *MockClient_DiscoverServers_Call) RunAndReturn(run func(context.Context) ([]string, error)) *MockClient_DiscoverServers_Call { + _c.Call.Return(run) + return _c +} + // FetchBootstrap provides a mock function with given fields: ctx func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) { ret := _m.Called(ctx) var r0 *BootstrapConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*BootstrapConfig, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) *BootstrapConfig); ok { r0 = rf(ctx) } else { @@ -80,7 +92,6 @@ func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, err } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -113,6 +124,65 @@ func (_c *MockClient_FetchBootstrap_Call) Return(_a0 *BootstrapConfig, _a1 error return _c } +func (_c *MockClient_FetchBootstrap_Call) RunAndReturn(run func(context.Context) (*BootstrapConfig, error)) *MockClient_FetchBootstrap_Call { + _c.Call.Return(run) + return _c +} + +// FetchTelemetryConfig provides a mock function with given fields: ctx +func (_m *MockClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) { + ret := _m.Called(ctx) + + var r0 *TelemetryConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*TelemetryConfig, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *TelemetryConfig); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*TelemetryConfig) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockClient_FetchTelemetryConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchTelemetryConfig' +type MockClient_FetchTelemetryConfig_Call struct { + *mock.Call +} + +// FetchTelemetryConfig is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) FetchTelemetryConfig(ctx interface{}) *MockClient_FetchTelemetryConfig_Call { + return &MockClient_FetchTelemetryConfig_Call{Call: _e.mock.On("FetchTelemetryConfig", ctx)} +} + +func (_c *MockClient_FetchTelemetryConfig_Call) Run(run func(ctx context.Context)) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockClient_FetchTelemetryConfig_Call) Return(_a0 *TelemetryConfig, _a1 error) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockClient_FetchTelemetryConfig_Call) RunAndReturn(run func(context.Context) (*TelemetryConfig, error)) *MockClient_FetchTelemetryConfig_Call { + _c.Call.Return(run) + return _c +} + // PushServerStatus provides a mock function with given fields: ctx, status func (_m *MockClient) PushServerStatus(ctx context.Context, status *ServerStatus) error { ret := _m.Called(ctx, status) @@ -151,6 +221,11 @@ func (_c *MockClient_PushServerStatus_Call) Return(_a0 error) *MockClient_PushSe return _c } +func (_c *MockClient_PushServerStatus_Call) RunAndReturn(run func(context.Context, *ServerStatus) error) *MockClient_PushServerStatus_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewMockClient interface { mock.TestingT Cleanup(func()) diff --git a/agent/hcp/client/mock_CloudConfig.go b/agent/hcp/client/mock_CloudConfig.go new file mode 100644 index 0000000000..5f2ef50046 --- /dev/null +++ b/agent/hcp/client/mock_CloudConfig.go @@ -0,0 +1,47 @@ +package client + +import ( + "crypto/tls" + "net/url" + + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/hashicorp/hcp-sdk-go/profile" + "github.com/hashicorp/hcp-sdk-go/resource" + "golang.org/x/oauth2" +) + +const testResourceID = "organization/test-org/project/test-project/test-type/test-id" + +type mockHCPCfg struct{} + +func (m *mockHCPCfg) Token() (*oauth2.Token, error) { + return &oauth2.Token{ + AccessToken: "test-token", + }, nil +} + +func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil } +func (m *mockHCPCfg) SCADAAddress() string { return "" } +func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} } +func (m *mockHCPCfg) APIAddress() string { return "" } +func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} } +func (m *mockHCPCfg) Profile() *profile.UserProfile { return nil } + +type MockCloudCfg struct { + ConfigErr error + ResourceErr error +} + +func (m MockCloudCfg) Resource() (resource.Resource, error) { + r := resource.Resource{ + ID: "test-id", + Type: "test-type", + Organization: "test-org", + Project: "test-project", + } + return r, m.ResourceErr +} + +func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { + return &mockHCPCfg{}, m.ConfigErr +} diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index a6d4c31979..8d1358fa4a 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -7,6 +7,7 @@ import ( "crypto/tls" hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/hashicorp/hcp-sdk-go/resource" ) // CloudConfig defines configuration for connecting to HCP services @@ -30,6 +31,10 @@ func (c *CloudConfig) WithTLSConfig(cfg *tls.Config) { c.TLSConfig = cfg } +func (c *CloudConfig) Resource() (resource.Resource, error) { + return resource.FromString(c.ResourceID) +} + func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { if c.TLSConfig == nil { c.TLSConfig = &tls.Config{} @@ -46,6 +51,6 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi if c.ScadaAddress != "" { opts = append(opts, hcpcfg.WithSCADA(c.ScadaAddress, c.TLSConfig)) } - opts = append(opts, hcpcfg.FromEnv()) + opts = append(opts, hcpcfg.FromEnv(), hcpcfg.WithoutBrowserLogin()) return hcpcfg.NewHCPConfig(opts...) } diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 6575b8d679..f4ad161dab 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -4,23 +4,94 @@ package hcp import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/armon/go-metrics" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" + "github.com/hashicorp/consul/agent/hcp/telemetry" + "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" ) // Deps contains the interfaces that the rest of Consul core depends on for HCP integration. type Deps struct { - Client Client + Client hcpclient.Client Provider scada.Provider + Sink metrics.MetricSink } -func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { - d.Client, err = NewClient(cfg) +func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) { + client, err := hcpclient.NewClient(cfg) + if err != nil { + return Deps{}, fmt.Errorf("failed to init client: %w:", err) + } + + provider, err := scada.New(cfg, logger.Named("scada")) + if err != nil { + return Deps{}, fmt.Errorf("failed to init scada: %w", err) + } + + sink := sink(client, &cfg, logger.Named("sink"), nodeID) + + return Deps{ + Client: client, + Provider: provider, + Sink: sink, + }, nil +} + +// sink provides initializes an OTELSink which forwards Consul metrics to HCP. +// The sink is only initialized if the server is registered with the management plane (CCM). +// This step should not block server initialization, so errors are logged, but not returned. +func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger, nodeID types.NodeID) metrics.MetricSink { + ctx := context.Background() + ctx = hclog.WithContext(ctx, logger) + + reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx) + if err != nil { + logger.Error("failed to fetch telemetry config", "error", err) + return nil + } + + endpoint, isEnabled := telemetryCfg.Enabled() + if !isEnabled { + return nil + } + + u, err := url.Parse(endpoint) + if err != nil { + logger.Error("failed to parse url endpoint", "error", err) + return nil + } + + metricsClient, err := hcpclient.NewMetricsClient(cfg, ctx) if err != nil { - return + logger.Error("failed to init metrics client", "error", err) + return nil } - d.Provider, err = scada.New(cfg, logger.Named("hcp.scada")) - return + sinkOpts := &telemetry.OTELSinkOpts{ + Ctx: ctx, + Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval), + Labels: telemetryCfg.DefaultLabels(string(nodeID)), + Filters: telemetryCfg.MetricsConfig.Filters, + } + + sink, err := telemetry.NewOTELSink(sinkOpts) + if err != nil { + logger.Error("failed to init OTEL sink", "error", err) + return nil + } + + logger.Debug("initialized HCP metrics sink") + + return sink } diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go new file mode 100644 index 0000000000..54ec7b6de4 --- /dev/null +++ b/agent/hcp/deps_test.go @@ -0,0 +1,106 @@ +package hcp + +import ( + "fmt" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/types" +) + +func TestSink(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + expect func(*client.MockClient) + mockCloudCfg client.CloudConfig + expectedSink bool + }{ + "success": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "https://test.com", + }, + }, nil) + }, + mockCloudCfg: client.MockCloudCfg{}, + expectedSink: true, + }, + "noSinkWhenServerNotRegisteredWithCCM": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + mockCloudCfg: client.MockCloudCfg{}, + }, + "noSinkWhenCCMVerificationFails": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed")) + }, + mockCloudCfg: client.MockCloudCfg{}, + }, + "noSinkWhenMetricsClientInitFails": { + mockCloudCfg: client.MockCloudCfg{ + ConfigErr: fmt.Errorf("test bad hcp config"), + }, + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "https://test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + }, + "failsWithFetchTelemetryFailure": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) + }, + }, + "failsWithURLParseErr": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + // Minimum 2 chars for a domain to be valid. + Endpoint: "s", + MetricsConfig: &client.MetricsConfig{ + // Invalid domain chars + Endpoint: " ", + }, + }, nil) + }, + }, + "noErrWithEmptyEndpoint": { + expect: func(mockClient *client.MockClient) { + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, + }, nil) + }, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + c := client.NewMockClient(t) + l := hclog.NewNullLogger() + test.expect(c) + sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234")) + if !test.expectedSink { + require.Nil(t, sinkOpts) + return + } + require.NotNil(t, sinkOpts) + }) + } +} diff --git a/agent/hcp/discover/discover.go b/agent/hcp/discover/discover.go index 43bf0c7719..12024b7dd6 100644 --- a/agent/hcp/discover/discover.go +++ b/agent/hcp/discover/discover.go @@ -9,7 +9,7 @@ import ( "log" "time" - "github.com/hashicorp/consul/agent/hcp" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" ) @@ -32,7 +32,7 @@ func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error return nil, err } - client, err := hcp.NewClient(cfg.CloudConfig) + client, err := hcpclient.NewClient(cfg.CloudConfig) if err != nil { return nil, err } diff --git a/agent/hcp/manager.go b/agent/hcp/manager.go index 9d5a2b44ab..0dc9db95da 100644 --- a/agent/hcp/manager.go +++ b/agent/hcp/manager.go @@ -8,6 +8,7 @@ import ( "sync" "time" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-hclog" ) @@ -18,7 +19,7 @@ var ( ) type ManagerConfig struct { - Client Client + Client hcpclient.Client StatusFn StatusCallback MinInterval time.Duration @@ -47,7 +48,7 @@ func (cfg *ManagerConfig) nextHeartbeat() time.Duration { return min + lib.RandomStagger(max-min) } -type StatusCallback func(context.Context) (ServerStatus, error) +type StatusCallback func(context.Context) (hcpclient.ServerStatus, error) type Manager struct { logger hclog.Logger diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index 41530b28af..4a3bdf582c 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,12 +16,12 @@ import ( ) func TestManager_Run(t *testing.T) { - client := NewMockClient(t) - statusF := func(ctx context.Context) (ServerStatus, error) { - return ServerStatus{ID: t.Name()}, nil + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil } updateCh := make(chan struct{}, 1) - client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Once() + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() mgr := NewManager(ManagerConfig{ Client: client, Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), @@ -43,14 +44,14 @@ func TestManager_Run(t *testing.T) { } func TestManager_SendUpdate(t *testing.T) { - client := NewMockClient(t) - statusF := func(ctx context.Context) (ServerStatus, error) { - return ServerStatus{ID: t.Name()}, nil + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil } updateCh := make(chan struct{}, 1) // Expect two calls, once during run startup and again when SendUpdate is called - client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Twice() + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice() mgr := NewManager(ManagerConfig{ Client: client, Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), @@ -73,14 +74,14 @@ func TestManager_SendUpdate(t *testing.T) { } func TestManager_SendUpdate_Periodic(t *testing.T) { - client := NewMockClient(t) - statusF := func(ctx context.Context) (ServerStatus, error) { - return ServerStatus{ID: t.Name()}, nil + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil } updateCh := make(chan struct{}, 1) // Expect two calls, once during run startup and again when SendUpdate is called - client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Twice() + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice() mgr := NewManager(ManagerConfig{ Client: client, Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), diff --git a/agent/hcp/telemetry/custom_metrics.go b/agent/hcp/telemetry/custom_metrics.go new file mode 100644 index 0000000000..746dc56cbe --- /dev/null +++ b/agent/hcp/telemetry/custom_metrics.go @@ -0,0 +1,14 @@ +package telemetry + +// Keys for custom Go Metrics metrics emitted only for the OTEL +// export (exporter.go) and transform (transform.go) failures and successes. +// These enable us to monitor OTEL operations. +var ( + internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"} + + internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"} + internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"} + + internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"} + internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"} +) diff --git a/agent/hcp/telemetry/doc.go b/agent/hcp/telemetry/doc.go new file mode 100644 index 0000000000..4ef18f39bd --- /dev/null +++ b/agent/hcp/telemetry/doc.go @@ -0,0 +1,12 @@ +// Package telemetry implements functionality to collect, aggregate, convert and export +// telemetry data in OpenTelemetry Protocol (OTLP) format. +// +// The entrypoint is the OpenTelemetry (OTEL) go-metrics sink which: +// - Receives metric data. +// - Aggregates metric data using the OTEL Go Metrics SDK. +// - Exports metric data using a configurable OTEL exporter. +// +// The package also provides an OTEL exporter implementation to be used within the sink, which: +// - Transforms metric data from the Metrics SDK OTEL representation to OTLP format. +// - Exports OTLP metric data to an external endpoint using a configurable client. +package telemetry diff --git a/agent/hcp/telemetry/filter.go b/agent/hcp/telemetry/filter.go new file mode 100644 index 0000000000..54dca7d44a --- /dev/null +++ b/agent/hcp/telemetry/filter.go @@ -0,0 +1,37 @@ +package telemetry + +import ( + "fmt" + "regexp" + "strings" + + "github.com/hashicorp/go-multierror" +) + +// newFilterRegex returns a valid regex used to filter metrics. +// It will fail if there are 0 valid regex filters given. +func newFilterRegex(filters []string) (*regexp.Regexp, error) { + var mErr error + validFilters := make([]string, 0, len(filters)) + for _, filter := range filters { + _, err := regexp.Compile(filter) + if err != nil { + mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err)) + continue + } + validFilters = append(validFilters, filter) + } + + if len(validFilters) == 0 { + return nil, multierror.Append(mErr, fmt.Errorf("no valid filters")) + } + + // Combine the valid regex strings with an OR. + finalRegex := strings.Join(validFilters, "|") + composedRegex, err := regexp.Compile(finalRegex) + if err != nil { + return nil, fmt.Errorf("failed to compile regex: %w", err) + } + + return composedRegex, nil +} diff --git a/agent/hcp/telemetry/filter_test.go b/agent/hcp/telemetry/filter_test.go new file mode 100644 index 0000000000..abe962f4cd --- /dev/null +++ b/agent/hcp/telemetry/filter_test.go @@ -0,0 +1,58 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFilter(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + filters []string + expectedRegexString string + matches []string + wantErr string + wantMatch bool + }{ + "badFilterRegex": { + filters: []string{"(*LF)"}, + wantErr: "no valid filters", + }, + "failsWithNoRegex": { + filters: []string{}, + wantErr: "no valid filters", + }, + "matchFound": { + filters: []string{"raft.*", "mem.*"}, + expectedRegexString: "raft.*|mem.*", + matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, + wantMatch: true, + }, + "matchNotFound": { + filters: []string{"mem.*"}, + matches: []string{"consul.raft.peers", "consul.txn.apply"}, + expectedRegexString: "mem.*", + wantMatch: false, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + f, err := newFilterRegex(tc.filters) + + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expectedRegexString, f.String()) + for _, metric := range tc.matches { + m := f.MatchString(metric) + require.Equal(t, tc.wantMatch, m) + } + }) + } +} diff --git a/agent/hcp/telemetry/gauge_store.go b/agent/hcp/telemetry/gauge_store.go new file mode 100644 index 0000000000..76dfb78066 --- /dev/null +++ b/agent/hcp/telemetry/gauge_store.go @@ -0,0 +1,77 @@ +package telemetry + +import ( + "context" + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// gaugeStore holds last seen Gauge values for a particular metric () in the store. +// OTEL does not currently have a synchronous Gauge instrument. Instead, it allows the registration of callbacks. +// The callbacks are called during export, where the Gauge value must be returned. +// This store is a workaround, which holds last seen Gauge values until the callback is called. +type gaugeStore struct { + store map[string]*gaugeValue + mutex sync.Mutex +} + +// gaugeValues are the last seen measurement for a Gauge metric, which contains a float64 value and labels. +type gaugeValue struct { + Value float64 + Attributes []attribute.KeyValue +} + +// NewGaugeStore returns an initialized empty gaugeStore. +func NewGaugeStore() *gaugeStore { + return &gaugeStore{ + store: make(map[string]*gaugeValue, 0), + } +} + +// LoadAndDelete will read a Gauge value and delete it. +// Once registered for a metric name, a Gauge callback will continue to execute every collection cycel. +// We must delete the value once we have read it, to avoid repeat values being sent. +func (g *gaugeStore) LoadAndDelete(key string) (*gaugeValue, bool) { + g.mutex.Lock() + defer g.mutex.Unlock() + + gauge, ok := g.store[key] + if !ok { + return nil, ok + } + + delete(g.store, key) + + return gauge, ok +} + +// Set adds a gaugeValue to the global gauge store. +func (g *gaugeStore) Set(key string, value float64, labels []attribute.KeyValue) { + g.mutex.Lock() + defer g.mutex.Unlock() + + gv := &gaugeValue{ + Value: value, + Attributes: labels, + } + + g.store[key] = gv +} + +// gaugeCallback returns a callback which gets called when metrics are collected for export. +func (g *gaugeStore) gaugeCallback(key string) metric.Float64Callback { + // Closures keep a reference to the key string, that get garbage collected when code completes. + return func(ctx context.Context, obs metric.Float64Observer) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if gauge, ok := g.LoadAndDelete(key); ok { + obs.Observe(gauge.Value, metric.WithAttributes(gauge.Attributes...)) + } + return nil + } + } +} diff --git a/agent/hcp/telemetry/gauge_store_test.go b/agent/hcp/telemetry/gauge_store_test.go new file mode 100644 index 0000000000..1171ee379c --- /dev/null +++ b/agent/hcp/telemetry/gauge_store_test.go @@ -0,0 +1,89 @@ +package telemetry + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" +) + +func TestGaugeStore(t *testing.T) { + t.Parallel() + + gaugeStore := NewGaugeStore() + + attributes := []attribute.KeyValue{ + { + Key: attribute.Key("test_key"), + Value: attribute.StringValue("test_value"), + }, + } + + gaugeStore.Set("test", 1.23, attributes) + + // Should store a new gauge. + val, ok := gaugeStore.LoadAndDelete("test") + require.True(t, ok) + require.Equal(t, val.Value, 1.23) + require.Equal(t, val.Attributes, attributes) + + // Gauge with key "test" have been deleted. + val, ok = gaugeStore.LoadAndDelete("test") + require.False(t, ok) + require.Nil(t, val) + + gaugeStore.Set("duplicate", 1.5, nil) + gaugeStore.Set("duplicate", 6.7, nil) + + // Gauge with key "duplicate" should hold the latest (last seen) value. + val, ok = gaugeStore.LoadAndDelete("duplicate") + require.True(t, ok) + require.Equal(t, val.Value, 6.7) +} + +func TestGaugeCallback_Failure(t *testing.T) { + t.Parallel() + + k := "consul.raft.apply" + gaugeStore := NewGaugeStore() + gaugeStore.Set(k, 1.23, nil) + + cb := gaugeStore.gaugeCallback(k) + ctx, cancel := context.WithCancel(context.Background()) + + cancel() + err := cb(ctx, nil) + require.ErrorIs(t, err, context.Canceled) +} + +// TestGaugeStore_Race induces a race condition. When run with go test -race, +// this test should pass if implementation is concurrency safe. +func TestGaugeStore_Race(t *testing.T) { + t.Parallel() + + gaugeStore := NewGaugeStore() + + wg := &sync.WaitGroup{} + samples := 100 + errCh := make(chan error, samples) + for i := 0; i < samples; i++ { + wg.Add(1) + key := fmt.Sprintf("consul.test.%d", i) + value := 12.34 + go func() { + defer wg.Done() + gaugeStore.Set(key, value, nil) + gv, _ := gaugeStore.LoadAndDelete(key) + if gv.Value != value { + errCh <- fmt.Errorf("expected value: '%f', but got: '%f' for key: '%s'", value, gv.Value, key) + } + }() + } + + wg.Wait() + + require.Empty(t, errCh) +} diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go new file mode 100644 index 0000000000..76c8f5b000 --- /dev/null +++ b/agent/hcp/telemetry/otel_exporter.go @@ -0,0 +1,81 @@ +package telemetry + +import ( + "context" + "fmt" + "net/url" + + goMetrics "github.com/armon/go-metrics" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + hcpclient "github.com/hashicorp/consul/agent/hcp/client" +) + +// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter. +// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics. +// This allows us to use a custom client - HCP authenticated MetricsClient. +type OTELExporter struct { + client hcpclient.MetricsClient + endpoint *url.URL +} + +// NewOTELExporter returns a configured OTELExporter +func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter { + return &OTELExporter{ + client: client, + endpoint: endpoint, + } +} + +// Temporality returns the Cumulative temporality for metrics aggregation. +// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default. +func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality +} + +// Aggregation returns the Aggregation to use for an instrument kind. +// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics. +// This custom version replicates that logic, but removes the panic. +func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation { + switch kind { + case metric.InstrumentKindObservableGauge: + return aggregation.LastValue{} + case metric.InstrumentKindHistogram: + return aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + NoMinMax: false, + } + } + // for metric.InstrumentKindCounter and others, default to sum. + return aggregation.Sum{} +} + +// Export serializes and transmits metric data to a receiver. +func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error { + otlpMetrics := transformOTLP(metrics) + if isEmpty(otlpMetrics) { + return nil + } + err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) + if err != nil { + goMetrics.IncrCounter(internalMetricExportFailure, 1) + return fmt.Errorf("failed to export metrics: %w", err) + } + + goMetrics.IncrCounter(internalMetricExportSuccess, 1) + return nil +} + +// ForceFlush is a no-op, as the MetricsClient client holds no state. +func (e *OTELExporter) ForceFlush(ctx context.Context) error { + goMetrics.IncrCounter(internalMetricExporterForceFlush, 1) + return ctx.Err() +} + +// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown. +func (e *OTELExporter) Shutdown(ctx context.Context) error { + goMetrics.IncrCounter(internalMetricExporterShutdown, 1) + return ctx.Err() +} diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go new file mode 100644 index 0000000000..bc1a626f1c --- /dev/null +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -0,0 +1,208 @@ +package telemetry + +import ( + "context" + "fmt" + "net/url" + "strings" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + + "github.com/hashicorp/consul/agent/hcp/client" +) + +type mockMetricsClient struct { + exportErr error +} + +func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { + return m.exportErr +} + +func TestTemporality(t *testing.T) { + t.Parallel() + exp := &OTELExporter{} + require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter)) +} + +func TestAggregation(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + kind metric.InstrumentKind + expAgg aggregation.Aggregation + }{ + "gauge": { + kind: metric.InstrumentKindObservableGauge, + expAgg: aggregation.LastValue{}, + }, + "counter": { + kind: metric.InstrumentKindCounter, + expAgg: aggregation.Sum{}, + }, + "histogram": { + kind: metric.InstrumentKindHistogram, + expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false}, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + exp := &OTELExporter{} + require.Equal(t, test.expAgg, exp.Aggregation(test.kind)) + }) + } +} + +func TestExport(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + wantErr string + metrics *metricdata.ResourceMetrics + client client.MetricsClient + }{ + "earlyReturnWithoutScopeMetrics": { + client: &mockMetricsClient{}, + metrics: mutateMetrics(nil), + }, + "earlyReturnWithoutMetrics": { + client: &mockMetricsClient{}, + metrics: mutateMetrics([]metricdata.ScopeMetrics{ + {Metrics: []metricdata.Metrics{}}, + }, + ), + }, + "errorWithExportFailure": { + client: &mockMetricsClient{ + exportErr: fmt.Errorf("failed to export metrics."), + }, + metrics: mutateMetrics([]metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + { + Name: "consul.raft.commitTime", + Data: metricdata.Gauge[float64]{}, + }, + }, + }, + }, + ), + wantErr: "failed to export metrics", + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + exp := NewOTELExporter(test.client, &url.URL{}) + + err := exp.Export(context.Background(), test.metrics) + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NoError(t, err) + }) + } +} + +// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted +// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal() +// sets a shared global sink. +func TestExport_CustomMetrics(t *testing.T) { + for name, tc := range map[string]struct { + client client.MetricsClient + metricKey []string + operation string + }{ + "exportSuccessEmitsCustomMetric": { + client: &mockMetricsClient{}, + metricKey: internalMetricExportSuccess, + operation: "export", + }, + "exportFailureEmitsCustomMetric": { + client: &mockMetricsClient{ + exportErr: fmt.Errorf("client err"), + }, + metricKey: internalMetricExportFailure, + operation: "export", + }, + "shutdownEmitsCustomMetric": { + metricKey: internalMetricExporterShutdown, + operation: "shutdown", + }, + "forceFlushEmitsCustomMetric": { + metricKey: internalMetricExporterForceFlush, + operation: "flush", + }, + } { + t.Run(name, func(t *testing.T) { + // Init global sink. + serviceName := "test.transform" + cfg := metrics.DefaultConfig(serviceName) + cfg.EnableHostname = false + + sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) + metrics.NewGlobal(cfg, sink) + + // Perform operation that emits metric. + exp := NewOTELExporter(tc.client, &url.URL{}) + + ctx := context.Background() + switch tc.operation { + case "flush": + exp.ForceFlush(ctx) + case "shutdown": + exp.Shutdown(ctx) + default: + exp.Export(ctx, inputResourceMetrics) + } + + // Collect sink metrics. + intervals := sink.Data() + require.Len(t, intervals, 1) + key := serviceName + "." + strings.Join(tc.metricKey, ".") + sv := intervals[0].Counters[key] + + // Verify count for transform failure metric. + require.NotNil(t, sv) + require.NotNil(t, sv.AggregateSample) + require.Equal(t, 1, sv.AggregateSample.Count) + }) + } +} + +func TestForceFlush(t *testing.T) { + t.Parallel() + exp := &OTELExporter{} + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := exp.ForceFlush(ctx) + require.ErrorIs(t, err, context.Canceled) +} + +func TestShutdown(t *testing.T) { + t.Parallel() + exp := &OTELExporter{} + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := exp.Shutdown(ctx) + require.ErrorIs(t, err, context.Canceled) +} + +func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics { + return &metricdata.ResourceMetrics{ + Resource: resource.Empty(), + ScopeMetrics: m, + } +} diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go new file mode 100644 index 0000000000..39e9aa599c --- /dev/null +++ b/agent/hcp/telemetry/otel_sink.go @@ -0,0 +1,245 @@ +package telemetry + +import ( + "bytes" + "context" + "fmt" + "net/url" + "regexp" + "strings" + "sync" + "time" + + gometrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + otelsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/hashicorp/consul/agent/hcp/client" +) + +// DefaultExportInterval is a default time interval between export of aggregated metrics. +const DefaultExportInterval = 10 * time.Second + +// OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink. +type OTELSinkOpts struct { + Reader otelsdk.Reader + Ctx context.Context + Filters []string + Labels map[string]string +} + +// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification. +// Metric data is exported in OpenTelemetry Protocol (OTLP) wire format. +// This should be used as a Go Metrics backend, as it implements the MetricsSink interface. +type OTELSink struct { + // spaceReplacer cleans the flattened key by removing any spaces. + spaceReplacer *strings.Replacer + logger hclog.Logger + filters *regexp.Regexp + + // meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK. + // It handles reading/export of aggregated metric data. + // It enables creation and usage of an OTEL Meter. + meterProvider *otelsdk.MeterProvider + + // meter is an OTEL Meter, which enables the creation of OTEL instruments. + meter *otelmetric.Meter + + // Instrument stores contain an OTEL Instrument per metric name () + // for each gauge, counter and histogram types. + // An instrument allows us to record a measurement for a particular metric, and continuously aggregates metrics. + // We lazy load the creation of these intruments until a metric is seen, and use them repeatedly to record measurements. + gaugeInstruments map[string]otelmetric.Float64ObservableGauge + counterInstruments map[string]otelmetric.Float64Counter + histogramInstruments map[string]otelmetric.Float64Histogram + + // gaugeStore is required to hold last-seen values of gauges + // This is a workaround, as OTEL currently does not have synchronous gauge instruments. + // It only allows the registration of "callbacks", which obtain values when the callback is called. + // We must hold gauge values until the callback is called, when the measurement is exported, and can be removed. + gaugeStore *gaugeStore + + mutex sync.Mutex +} + +// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds. +// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export +// metrics in OTLP format to an external url. +func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader { + exporter := NewOTELExporter(client, url) + return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) +} + +// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface. +// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which +// enable us to create OTEL Instruments to record measurements. +func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { + if opts.Reader == nil { + return nil, fmt.Errorf("ferror: provide valid reader") + } + + if opts.Ctx == nil { + return nil, fmt.Errorf("ferror: provide valid context") + } + + logger := hclog.FromContext(opts.Ctx).Named("otel_sink") + + filterList, err := newFilterRegex(opts.Filters) + if err != nil { + logger.Error("Failed to initialize all filters", "error", err) + } + + attrs := make([]attribute.KeyValue, 0, len(opts.Labels)) + for k, v := range opts.Labels { + kv := attribute.KeyValue{ + Key: attribute.Key(k), + Value: attribute.StringValue(v), + } + attrs = append(attrs, kv) + } + // Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically. + res := resource.NewWithAttributes("", attrs...) + meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) + meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") + + return &OTELSink{ + filters: filterList, + spaceReplacer: strings.NewReplacer(" ", "_"), + logger: logger, + meterProvider: meterProvider, + meter: &meter, + gaugeStore: NewGaugeStore(), + gaugeInstruments: make(map[string]otelmetric.Float64ObservableGauge, 0), + counterInstruments: make(map[string]otelmetric.Float64Counter, 0), + histogramInstruments: make(map[string]otelmetric.Float64Histogram, 0), + }, nil +} + +// SetGauge emits a Consul gauge metric. +func (o *OTELSink) SetGauge(key []string, val float32) { + o.SetGaugeWithLabels(key, val, nil) +} + +// AddSample emits a Consul histogram metric. +func (o *OTELSink) AddSample(key []string, val float32) { + o.AddSampleWithLabels(key, val, nil) +} + +// IncrCounter emits a Consul counter metric. +func (o *OTELSink) IncrCounter(key []string, val float32) { + o.IncrCounterWithLabels(key, val, nil) +} + +// AddSampleWithLabels emits a Consul gauge metric that gets +// registed by an OpenTelemetry Histogram instrument. +func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { + k := o.flattenKey(key) + + if !o.filters.MatchString(k) { + return + } + + // Set value in global Gauge store. + o.gaugeStore.Set(k, float64(val), toAttributes(labels)) + + o.mutex.Lock() + defer o.mutex.Unlock() + + // If instrument does not exist, create it and register callback to emit last value in global Gauge store. + if _, ok := o.gaugeInstruments[k]; !ok { + // The registration of a callback only needs to happen once, when the instrument is created. + // The callback will be triggered every export cycle for that metric. + // It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle. + inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k))) + if err != nil { + o.logger.Error("Failed to create gauge instrument", "error", err) + return + } + o.gaugeInstruments[k] = inst + } +} + +// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument. +func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { + k := o.flattenKey(key) + + if !o.filters.MatchString(k) { + return + } + + o.mutex.Lock() + defer o.mutex.Unlock() + + inst, ok := o.histogramInstruments[k] + if !ok { + histogram, err := (*o.meter).Float64Histogram(k) + if err != nil { + o.logger.Error("Failed create histogram instrument", "error", err) + return + } + inst = histogram + o.histogramInstruments[k] = inst + } + + attrs := toAttributes(labels) + inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) +} + +// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument. +func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { + k := o.flattenKey(key) + + if !o.filters.MatchString(k) { + return + } + + o.mutex.Lock() + defer o.mutex.Unlock() + + inst, ok := o.counterInstruments[k] + if !ok { + counter, err := (*o.meter).Float64Counter(k) + if err != nil { + o.logger.Error("Failed to create counter instrument:", "error", err) + return + } + + inst = counter + o.counterInstruments[k] = inst + } + + attrs := toAttributes(labels) + inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) +} + +// EmitKey unsupported. +func (o *OTELSink) EmitKey(key []string, val float32) {} + +// flattenKey key along with its labels. +func (o *OTELSink) flattenKey(parts []string) string { + buf := &bytes.Buffer{} + joined := strings.Join(parts, ".") + + o.spaceReplacer.WriteString(buf, joined) + + return buf.String() +} + +// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue +func toAttributes(labels []gometrics.Label) []attribute.KeyValue { + if len(labels) == 0 { + return nil + } + attrs := make([]attribute.KeyValue, len(labels)) + for i, label := range labels { + attrs[i] = attribute.KeyValue{ + Key: attribute.Key(label.Name), + Value: attribute.StringValue(label.Value), + } + } + + return attrs +} diff --git a/agent/hcp/telemetry/otel_sink_test.go b/agent/hcp/telemetry/otel_sink_test.go new file mode 100644 index 0000000000..34127bdf9d --- /dev/null +++ b/agent/hcp/telemetry/otel_sink_test.go @@ -0,0 +1,409 @@ +package telemetry + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "testing" + + gometrics "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" +) + +var ( + expectedResource = resource.NewWithAttributes("", attribute.KeyValue{ + Key: attribute.Key("node_id"), + Value: attribute.StringValue("test"), + }) + + attrs = attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("metric.label"), + Value: attribute.StringValue("test"), + }) + + expectedSinkMetrics = map[string]metricdata.Metrics{ + "consul.raft.leader": { + Name: "consul.raft.leader", + Description: "", + Unit: "", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Value: float64(float32(0)), + }, + }, + }, + }, + "consul.autopilot.healthy": { + Name: "consul.autopilot.healthy", + Description: "", + Unit: "", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attrs, + Value: float64(float32(1.23)), + }, + }, + }, + }, + "consul.raft.state.leader": { + Name: "consul.raft.state.leader", + Description: "", + Unit: "", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Value: float64(float32(23.23)), + }, + }, + }, + }, + "consul.raft.apply": { + Name: "consul.raft.apply", + Description: "", + Unit: "", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attrs, + Value: float64(float32(1.44)), + }, + }, + }, + }, + "consul.raft.leader.lastContact": { + Name: "consul.raft.leader.lastContact", + Description: "", + Unit: "", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Count: 1, + Sum: float64(float32(45.32)), + Min: metricdata.NewExtrema(float64(float32(45.32))), + Max: metricdata.NewExtrema(float64(float32(45.32))), + }, + }, + }, + }, + "consul.raft.commitTime": { + Name: "consul.raft.commitTime", + Description: "", + Unit: "", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attrs, + Count: 1, + Sum: float64(float32(26.34)), + Min: metricdata.NewExtrema(float64(float32(26.34))), + Max: metricdata.NewExtrema(float64(float32(26.34))), + }, + }, + }, + }, + } +) + +func TestNewOTELSink(t *testing.T) { + t.Parallel() + for name, test := range map[string]struct { + wantErr string + opts *OTELSinkOpts + }{ + "failsWithEmptyLogger": { + wantErr: "ferror: provide valid context", + opts: &OTELSinkOpts{ + Reader: metric.NewManualReader(), + }, + }, + "failsWithEmptyReader": { + wantErr: "ferror: provide valid reader", + opts: &OTELSinkOpts{ + Reader: nil, + Ctx: context.Background(), + }, + }, + "success": { + opts: &OTELSinkOpts{ + Ctx: context.Background(), + Reader: metric.NewManualReader(), + Labels: map[string]string{ + "server": "test", + }, + Filters: []string{"raft"}, + }, + }, + } { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + sink, err := NewOTELSink(test.opts) + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NotNil(t, sink) + }) + } +} + +func TestOTELSink(t *testing.T) { + t.Parallel() + + // Manual reader outputs the aggregated metrics when reader.Collect is called. + reader := metric.NewManualReader() + + ctx := context.Background() + opts := &OTELSinkOpts{ + Reader: reader, + Ctx: ctx, + Filters: []string{"raft", "autopilot"}, + Labels: map[string]string{ + "node_id": "test", + }, + } + + sink, err := NewOTELSink(opts) + require.NoError(t, err) + + labels := []gometrics.Label{ + { + Name: "metric.label", + Value: "test", + }, + } + + sink.SetGauge([]string{"consul", "raft", "leader"}, float32(0)) + sink.SetGaugeWithLabels([]string{"consul", "autopilot", "healthy"}, float32(1.23), labels) + + sink.IncrCounter([]string{"consul", "raft", "state", "leader"}, float32(23.23)) + sink.IncrCounterWithLabels([]string{"consul", "raft", "apply"}, float32(1.44), labels) + + sink.AddSample([]string{"consul", "raft", "leader", "lastContact"}, float32(45.32)) + sink.AddSampleWithLabels([]string{"consul", "raft", "commitTime"}, float32(26.34), labels) + + var collected metricdata.ResourceMetrics + err = reader.Collect(ctx, &collected) + require.NoError(t, err) + + isSame(t, expectedSinkMetrics, collected) +} + +func TestOTELSink_Race(t *testing.T) { + reader := metric.NewManualReader() + ctx := context.Background() + opts := &OTELSinkOpts{ + Ctx: ctx, + Reader: reader, + Labels: map[string]string{ + "node_id": "test", + }, + Filters: []string{"test"}, + } + + sink, err := NewOTELSink(opts) + require.NoError(t, err) + + samples := 100 + expectedMetrics := generateSamples(samples) + wg := &sync.WaitGroup{} + errCh := make(chan error, samples) + for k, v := range expectedMetrics { + wg.Add(1) + go func(k string, v metricdata.Metrics) { + defer wg.Done() + performSinkOperation(sink, k, v, errCh) + }(k, v) + } + wg.Wait() + + require.Empty(t, errCh) + + var collected metricdata.ResourceMetrics + err = reader.Collect(ctx, &collected) + require.NoError(t, err) + + isSame(t, expectedMetrics, collected) +} + +// generateSamples generates n of each gauges, counter and histogram measurements to use for test purposes. +func generateSamples(n int) map[string]metricdata.Metrics { + generated := make(map[string]metricdata.Metrics, 3*n) + + for i := 0; i < n; i++ { + v := 12.3 + k := fmt.Sprintf("consul.test.gauges.%d", i) + generated[k] = metricdata.Metrics{ + Name: k, + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Value: float64(float32(v)), + }, + }, + }, + } + } + + for i := 0; i < n; i++ { + v := 22.23 + k := fmt.Sprintf("consul.test.sum.%d", i) + generated[k] = metricdata.Metrics{ + Name: k, + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Value: float64(float32(v)), + }, + }, + }, + } + + } + + for i := 0; i < n; i++ { + v := 13.24 + k := fmt.Sprintf("consul.test.hist.%d", i) + generated[k] = metricdata.Metrics{ + Name: k, + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: *attribute.EmptySet(), + Sum: float64(float32(v)), + Max: metricdata.NewExtrema(float64(float32(v))), + Min: metricdata.NewExtrema(float64(float32(v))), + Count: 1, + }, + }, + }, + } + } + + return generated +} + +// performSinkOperation emits a measurement using the OTELSink and calls wg.Done() when completed. +func performSinkOperation(sink *OTELSink, k string, v metricdata.Metrics, errCh chan error) { + key := strings.Split(k, ".") + data := v.Data + switch data.(type) { + case metricdata.Gauge[float64]: + gauge, ok := data.(metricdata.Gauge[float64]) + if !ok { + errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) + } + sink.SetGauge(key, float32(gauge.DataPoints[0].Value)) + case metricdata.Sum[float64]: + sum, ok := data.(metricdata.Sum[float64]) + if !ok { + errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) + } + sink.IncrCounter(key, float32(sum.DataPoints[0].Value)) + case metricdata.Histogram[float64]: + hist, ok := data.(metricdata.Histogram[float64]) + if !ok { + errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) + } + sink.AddSample(key, float32(hist.DataPoints[0].Sum)) + } +} + +func isSame(t *testing.T, expectedMap map[string]metricdata.Metrics, actual metricdata.ResourceMetrics) { + // Validate resource + require.Equal(t, expectedResource, actual.Resource) + + // Validate Metrics + require.NotEmpty(t, actual.ScopeMetrics) + actualMetrics := actual.ScopeMetrics[0].Metrics + require.Equal(t, len(expectedMap), len(actualMetrics)) + + for _, actual := range actualMetrics { + name := actual.Name + expected, ok := expectedMap[actual.Name] + require.True(t, ok, "metric key %s should be in expectedMetrics map", name) + isSameMetrics(t, expected, actual) + } +} + +// compareMetrics verifies if two metricdata.Metric objects are equal by ignoring the time component. +// avoid duplicate datapoint values to ensure predictable order of sort. +func isSameMetrics(t *testing.T, expected metricdata.Metrics, actual metricdata.Metrics) { + require.Equal(t, expected.Name, actual.Name, "different .Name field") + require.Equal(t, expected.Description, actual.Description, "different .Description field") + require.Equal(t, expected.Unit, actual.Unit, "different .Unit field") + + switch expectedData := expected.Data.(type) { + case metricdata.Gauge[float64]: + actualData, ok := actual.Data.(metricdata.Gauge[float64]) + require.True(t, ok, "different metric types: expected metricdata.Gauge[float64]") + + isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints) + case metricdata.Sum[float64]: + actualData, ok := actual.Data.(metricdata.Sum[float64]) + require.True(t, ok, "different metric types: expected metricdata.Sum[float64]") + + isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints) + case metricdata.Histogram[float64]: + actualData, ok := actual.Data.(metricdata.Histogram[float64]) + require.True(t, ok, "different metric types: expected metricdata.Histogram") + + isSameHistogramData(t, expectedData.DataPoints, actualData.DataPoints) + } +} + +func isSameDataPoint(t *testing.T, expected []metricdata.DataPoint[float64], actual []metricdata.DataPoint[float64]) { + require.Equal(t, len(expected), len(actual), "different datapoints length") + + // Sort for predictable data in order of lowest value. + sort.Slice(expected, func(i, j int) bool { + return expected[i].Value < expected[j].Value + }) + sort.Slice(actual, func(i, j int) bool { + return expected[i].Value < expected[j].Value + }) + + // Only verify the value and attributes. + for i, dp := range expected { + currActual := actual[i] + require.Equal(t, dp.Value, currActual.Value, "different datapoint value") + require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes") + } +} + +func isSameHistogramData(t *testing.T, expected []metricdata.HistogramDataPoint[float64], actual []metricdata.HistogramDataPoint[float64]) { + require.Equal(t, len(expected), len(actual), "different histogram datapoint length") + + // Sort for predictable data in order of lowest sum. + sort.Slice(expected, func(i, j int) bool { + return expected[i].Sum < expected[j].Sum + }) + sort.Slice(actual, func(i, j int) bool { + return expected[i].Sum < expected[j].Sum + }) + + // Only verify the value and the attributes. + for i, dp := range expected { + currActual := actual[i] + require.Equal(t, dp.Sum, currActual.Sum, "different histogram datapoint .Sum value") + require.Equal(t, dp.Max, currActual.Max, "different histogram datapoint .Max value") + require.Equal(t, dp.Min, currActual.Min, "different histogram datapoint .Min value") + require.Equal(t, dp.Count, currActual.Count, "different histogram datapoint .Count value") + require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes") + } +} diff --git a/agent/hcp/telemetry/otlp_transform.go b/agent/hcp/telemetry/otlp_transform.go new file mode 100644 index 0000000000..76e20552a0 --- /dev/null +++ b/agent/hcp/telemetry/otlp_transform.go @@ -0,0 +1,186 @@ +package telemetry + +import ( + "errors" + "fmt" + + goMetrics "github.com/armon/go-metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + cpb "go.opentelemetry.io/proto/otlp/common/v1" + mpb "go.opentelemetry.io/proto/otlp/metrics/v1" + rpb "go.opentelemetry.io/proto/otlp/resource/v1" +) + +var ( + aggregationErr = errors.New("unsupported aggregation") + temporalityErr = errors.New("unsupported temporality") +) + +// isEmpty verifies if the given OTLP protobuf metrics contains metric data. +// isEmpty returns true if no ScopeMetrics exist or all metrics within ScopeMetrics are empty. +func isEmpty(rm *mpb.ResourceMetrics) bool { + // No ScopeMetrics + if len(rm.ScopeMetrics) == 0 { + return true + } + + // If any inner metrics contain data, return false. + for _, v := range rm.ScopeMetrics { + if len(v.Metrics) != 0 { + return false + } + } + + // All inner metrics are empty. + return true +} + +// TransformOTLP returns an OTLP ResourceMetrics generated from OTEL metrics. If rm +// contains invalid ScopeMetrics, an error will be returned along with an OTLP +// ResourceMetrics that contains partial OTLP ScopeMetrics. +func transformOTLP(rm *metricdata.ResourceMetrics) *mpb.ResourceMetrics { + sms := scopeMetricsToPB(rm.ScopeMetrics) + return &mpb.ResourceMetrics{ + Resource: &rpb.Resource{ + Attributes: attributesToPB(rm.Resource.Iter()), + }, + ScopeMetrics: sms, + } +} + +// scopeMetrics returns a slice of OTLP ScopeMetrics. +func scopeMetricsToPB(scopeMetrics []metricdata.ScopeMetrics) []*mpb.ScopeMetrics { + out := make([]*mpb.ScopeMetrics, 0, len(scopeMetrics)) + for _, sm := range scopeMetrics { + ms := metricsToPB(sm.Metrics) + out = append(out, &mpb.ScopeMetrics{ + Scope: &cpb.InstrumentationScope{ + Name: sm.Scope.Name, + Version: sm.Scope.Version, + }, + Metrics: ms, + }) + } + return out +} + +// metrics returns a slice of OTLP Metric generated from OTEL metrics sdk ones. +func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric { + out := make([]*mpb.Metric, 0, len(metrics)) + for _, m := range metrics { + o, err := metricTypeToPB(m) + if err != nil { + goMetrics.IncrCounter(internalMetricTransformFailure, 1) + continue + } + out = append(out, o) + } + return out +} + +// metricType identifies the instrument type and converts it to OTLP format. +// only float64 values are accepted since the go metrics sink only receives float64 values. +func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) { + out := &mpb.Metric{ + Name: m.Name, + Description: m.Description, + Unit: m.Unit, + } + switch a := m.Data.(type) { + case metricdata.Gauge[float64]: + out.Data = &mpb.Metric_Gauge{ + Gauge: &mpb.Gauge{ + DataPoints: dataPointsToPB(a.DataPoints), + }, + } + case metricdata.Sum[float64]: + if a.Temporality != metricdata.CumulativeTemporality { + return out, fmt.Errorf("error: %w: %T", temporalityErr, a) + } + out.Data = &mpb.Metric_Sum{ + Sum: &mpb.Sum{ + AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: a.IsMonotonic, + DataPoints: dataPointsToPB(a.DataPoints), + }, + } + case metricdata.Histogram[float64]: + if a.Temporality != metricdata.CumulativeTemporality { + return out, fmt.Errorf("error: %w: %T", temporalityErr, a) + } + out.Data = &mpb.Metric_Histogram{ + Histogram: &mpb.Histogram{ + AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: histogramDataPointsToPB(a.DataPoints), + }, + } + default: + return out, fmt.Errorf("error: %w: %T", aggregationErr, a) + } + return out, nil +} + +// DataPoints returns a slice of OTLP NumberDataPoint generated from OTEL metrics sdk ones. +func dataPointsToPB(dataPoints []metricdata.DataPoint[float64]) []*mpb.NumberDataPoint { + out := make([]*mpb.NumberDataPoint, 0, len(dataPoints)) + for _, dp := range dataPoints { + ndp := &mpb.NumberDataPoint{ + Attributes: attributesToPB(dp.Attributes.Iter()), + StartTimeUnixNano: uint64(dp.StartTime.UnixNano()), + TimeUnixNano: uint64(dp.Time.UnixNano()), + } + + ndp.Value = &mpb.NumberDataPoint_AsDouble{ + AsDouble: dp.Value, + } + out = append(out, ndp) + } + return out +} + +// HistogramDataPoints returns a slice of OTLP HistogramDataPoint from OTEL metrics sdk ones. +func histogramDataPointsToPB(dataPoints []metricdata.HistogramDataPoint[float64]) []*mpb.HistogramDataPoint { + out := make([]*mpb.HistogramDataPoint, 0, len(dataPoints)) + for _, dp := range dataPoints { + sum := dp.Sum + hdp := &mpb.HistogramDataPoint{ + Attributes: attributesToPB(dp.Attributes.Iter()), + StartTimeUnixNano: uint64(dp.StartTime.UnixNano()), + TimeUnixNano: uint64(dp.Time.UnixNano()), + Count: dp.Count, + Sum: &sum, + BucketCounts: dp.BucketCounts, + ExplicitBounds: dp.Bounds, + } + if v, ok := dp.Min.Value(); ok { + hdp.Min = &v + } + if v, ok := dp.Max.Value(); ok { + hdp.Max = &v + } + out = append(out, hdp) + } + return out +} + +// attributes transforms items of an attribute iterator into OTLP key-values. +// Currently, labels are only key-value pairs. +func attributesToPB(iter attribute.Iterator) []*cpb.KeyValue { + l := iter.Len() + if iter.Len() == 0 { + return nil + } + + out := make([]*cpb.KeyValue, 0, l) + for iter.Next() { + kv := iter.Attribute() + av := &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: kv.Value.AsString(), + }, + } + out = append(out, &cpb.KeyValue{Key: string(kv.Key), Value: av}) + } + return out +} diff --git a/agent/hcp/telemetry/otlp_transform_test.go b/agent/hcp/telemetry/otlp_transform_test.go new file mode 100644 index 0000000000..8f6beb7d48 --- /dev/null +++ b/agent/hcp/telemetry/otlp_transform_test.go @@ -0,0 +1,342 @@ +package telemetry + +import ( + "strings" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + cpb "go.opentelemetry.io/proto/otlp/common/v1" + mpb "go.opentelemetry.io/proto/otlp/metrics/v1" + rpb "go.opentelemetry.io/proto/otlp/resource/v1" +) + +var ( + // Common attributes for test cases. + start = time.Date(2000, time.January, 01, 0, 0, 0, 0, time.FixedZone("GMT", 0)) + end = start.Add(30 * time.Second) + + alice = attribute.NewSet(attribute.String("user", "alice")) + bob = attribute.NewSet(attribute.String("user", "bob")) + + pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + }} + pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + }} + + // DataPoint test case : Histogram Datapoints (Histogram) + minA, maxA, sumA = 2.0, 4.0, 90.0 + minB, maxB, sumB = 4.0, 150.0, 234.0 + inputHDP = []metricdata.HistogramDataPoint[float64]{{ + Attributes: alice, + StartTime: start, + Time: end, + Count: 30, + Bounds: []float64{1, 5}, + BucketCounts: []uint64{0, 30, 0}, + Min: metricdata.NewExtrema(minA), + Max: metricdata.NewExtrema(maxA), + Sum: sumA, + }, { + Attributes: bob, + StartTime: start, + Time: end, + Count: 3, + Bounds: []float64{1, 5}, + BucketCounts: []uint64{0, 1, 2}, + Min: metricdata.NewExtrema(minB), + Max: metricdata.NewExtrema(maxB), + Sum: sumB, + }} + + expectedHDP = []*mpb.HistogramDataPoint{{ + Attributes: []*cpb.KeyValue{pbAlice}, + StartTimeUnixNano: uint64(start.UnixNano()), + TimeUnixNano: uint64(end.UnixNano()), + Count: 30, + Sum: &sumA, + ExplicitBounds: []float64{1, 5}, + BucketCounts: []uint64{0, 30, 0}, + Min: &minA, + Max: &maxA, + }, { + Attributes: []*cpb.KeyValue{pbBob}, + StartTimeUnixNano: uint64(start.UnixNano()), + TimeUnixNano: uint64(end.UnixNano()), + Count: 3, + Sum: &sumB, + ExplicitBounds: []float64{1, 5}, + BucketCounts: []uint64{0, 1, 2}, + Min: &minB, + Max: &maxB, + }} + // DataPoint test case : Number Datapoints (Gauge / Counter) + inputDP = []metricdata.DataPoint[float64]{ + {Attributes: alice, StartTime: start, Time: end, Value: 1.0}, + {Attributes: bob, StartTime: start, Time: end, Value: 2.0}, + } + + expectedDP = []*mpb.NumberDataPoint{ + { + Attributes: []*cpb.KeyValue{pbAlice}, + StartTimeUnixNano: uint64(start.UnixNano()), + TimeUnixNano: uint64(end.UnixNano()), + Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 1.0}, + }, + { + Attributes: []*cpb.KeyValue{pbBob}, + StartTimeUnixNano: uint64(start.UnixNano()), + TimeUnixNano: uint64(end.UnixNano()), + Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 2.0}, + }, + } + + invalidSumTemporality = metricdata.Metrics{ + Name: "invalid-sum", + Description: "Sum with invalid temporality", + Unit: "1", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: false, + DataPoints: inputDP, + }, + } + + invalidSumAgg = metricdata.Metrics{ + Name: "unknown", + Description: "Unknown aggregation", + Unit: "1", + Data: metricdata.Sum[int64]{}, + } + + invalidHistTemporality = metricdata.Metrics{ + Name: "invalid-histogram", + Description: "Invalid histogram", + Unit: "1", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: inputHDP, + }, + } + + validFloat64Gauge = metricdata.Metrics{ + Name: "float64-gauge", + Description: "Gauge with float64 values", + Unit: "1", + Data: metricdata.Gauge[float64]{DataPoints: inputDP}, + } + + validFloat64Sum = metricdata.Metrics{ + Name: "float64-sum", + Description: "Sum with float64 values", + Unit: "1", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: inputDP, + }, + } + + validFloat64Hist = metricdata.Metrics{ + Name: "float64-histogram", + Description: "Histogram", + Unit: "1", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: inputHDP, + }, + } + + // Metrics Test Case + // - 3 invalid metrics and 3 Valid to test filtering + // - 1 invalid metric type + // - 2 invalid cummulative temporalities (only cummulative supported) + // - 3 types (Gauge, Counter, and Histogram) supported + inputMetrics = []metricdata.Metrics{ + validFloat64Gauge, + validFloat64Sum, + validFloat64Hist, + invalidSumTemporality, + invalidHistTemporality, + invalidSumAgg, + } + + expectedMetrics = []*mpb.Metric{ + { + Name: "float64-gauge", + Description: "Gauge with float64 values", + Unit: "1", + Data: &mpb.Metric_Gauge{Gauge: &mpb.Gauge{DataPoints: expectedDP}}, + }, + { + Name: "float64-sum", + Description: "Sum with float64 values", + Unit: "1", + Data: &mpb.Metric_Sum{Sum: &mpb.Sum{ + AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: false, + DataPoints: expectedDP, + }}, + }, + { + Name: "float64-histogram", + Description: "Histogram", + Unit: "1", + Data: &mpb.Metric_Histogram{Histogram: &mpb.Histogram{ + AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: expectedHDP, + }}, + }, + } + + // ScopeMetrics Test Cases + inputScopeMetrics = []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{ + Name: "test/code/path", + Version: "v0.1.0", + }, + Metrics: inputMetrics, + }} + + expectedScopeMetrics = []*mpb.ScopeMetrics{{ + Scope: &cpb.InstrumentationScope{ + Name: "test/code/path", + Version: "v0.1.0", + }, + Metrics: expectedMetrics, + }} + + // ResourceMetrics Test Cases + inputResourceMetrics = &metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless( + semconv.ServiceName("test server"), + semconv.ServiceVersion("v0.1.0"), + ), + ScopeMetrics: inputScopeMetrics, + } + + expectedResourceMetrics = &mpb.ResourceMetrics{ + Resource: &rpb.Resource{ + Attributes: []*cpb.KeyValue{ + { + Key: "service.name", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + }, + }, + { + Key: "service.version", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + }, + }, + }, + }, + ScopeMetrics: expectedScopeMetrics, + } +) + +// TestTransformOTLP runs tests from the "bottom-up" of the metricdata data types. +func TestTransformOTLP(t *testing.T) { + t.Parallel() + // Histogram DataPoint Test Case (Histograms) + assert.Equal(t, expectedHDP, histogramDataPointsToPB(inputHDP)) + + // Number DataPoint Test Case (Counters / Gauges) + require.Equal(t, expectedDP, dataPointsToPB(inputDP)) + + // MetricType Error Test Cases + _, err := metricTypeToPB(invalidHistTemporality) + require.Error(t, err) + require.ErrorIs(t, err, temporalityErr) + + _, err = metricTypeToPB(invalidSumTemporality) + require.Error(t, err) + require.ErrorIs(t, err, temporalityErr) + + _, err = metricTypeToPB(invalidSumAgg) + require.Error(t, err) + require.ErrorIs(t, err, aggregationErr) + + // Metrics Test Case + m := metricsToPB(inputMetrics) + require.Equal(t, expectedMetrics, m) + require.Equal(t, len(expectedMetrics), 3) + + // Scope Metrics Test Case + sm := scopeMetricsToPB(inputScopeMetrics) + require.Equal(t, expectedScopeMetrics, sm) + + // // Resource Metrics Test Case + rm := transformOTLP(inputResourceMetrics) + require.Equal(t, expectedResourceMetrics, rm) +} + +// TestTransformOTLP_CustomMetrics tests that a custom metric (hcp.otel.transform.failure) is emitted +// when transform fails. This test cannot be run in parallel as the metrics.NewGlobal() +// sets a shared global sink. +func TestTransformOTLP_CustomMetrics(t *testing.T) { + for name, tc := range map[string]struct { + inputRM *metricdata.ResourceMetrics + expectedMetricCount int + }{ + "successNoMetric": { + inputRM: &metricdata.ResourceMetrics{ + // 3 valid metrics. + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + validFloat64Gauge, + validFloat64Hist, + validFloat64Sum, + }, + }, + }, + }, + }, + "failureEmitsMetric": { + // inputScopeMetrics contains 3 bad metrics. + inputRM: inputResourceMetrics, + expectedMetricCount: 3, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + // Init global sink. + serviceName := "test.transform" + cfg := metrics.DefaultConfig(serviceName) + cfg.EnableHostname = false + + sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) + metrics.NewGlobal(cfg, sink) + + // Perform operation that emits metric. + transformOTLP(tc.inputRM) + + // Collect sink metrics. + intervals := sink.Data() + require.Len(t, intervals, 1) + key := serviceName + "." + strings.Join(internalMetricTransformFailure, ".") + sv := intervals[0].Counters[key] + + if tc.expectedMetricCount == 0 { + require.Empty(t, sv) + return + } + + // Verify count for transform failure metric. + require.NotNil(t, sv) + require.NotNil(t, sv.AggregateSample) + require.Equal(t, 3, sv.AggregateSample.Count) + }) + } +} diff --git a/agent/setup.go b/agent/setup.go index a4520e3cfc..9ed993aaf4 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" wal "github.com/hashicorp/raft-wal" @@ -101,7 +102,18 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries - d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger) + var extraSinks []metrics.MetricSink + if cfg.IsCloudEnabled() { + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.NodeID) + if err != nil { + return d, err + } + if d.HCP.Sink != nil { + extraSinks = append(extraSinks, d.HCP.Sink) + } + } + + d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...) if err != nil { return d, fmt.Errorf("failed to initialize telemetry: %w", err) } @@ -192,12 +204,6 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl d.EventPublisher = stream.NewEventPublisher(10 * time.Second) d.XDSStreamLimiter = limiter.NewSessionLimiter() - if cfg.IsCloudEnabled() { - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger) - if err != nil { - return d, err - } - } return d, nil } diff --git a/command/agent/agent.go b/command/agent/agent.go index f76e4337d5..c2d0dcea09 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -21,8 +21,8 @@ import ( "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" - "github.com/hashicorp/consul/agent/hcp" hcpbootstrap "github.com/hashicorp/consul/agent/hcp/bootstrap" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/command/cli" "github.com/hashicorp/consul/command/flags" "github.com/hashicorp/consul/lib" @@ -169,7 +169,7 @@ func (c *cmd) run(args []string) int { return 1 } if res.RuntimeConfig.IsCloudEnabled() { - client, err := hcp.NewClient(res.RuntimeConfig.Cloud) + client, err := hcpclient.NewClient(res.RuntimeConfig.Cloud) if err != nil { ui.Error("error building HCP HTTP client: " + err.Error()) return 1 diff --git a/go.mod b/go.mod index 2b672b7de1..89ef735d98 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/hashicorp/go-memdb v1.3.4 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-raftchunking v0.7.0 + github.com/hashicorp/go-retryablehttp v0.6.7 github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6 github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/go-syslog v1.0.0 @@ -61,7 +62,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcp-scada-provider v0.2.3 - github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3 + github.com/hashicorp/hcp-sdk-go v0.48.0 github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 github.com/hashicorp/memberlist v0.5.0 github.com/hashicorp/raft v1.5.0 @@ -93,14 +94,19 @@ require ( github.com/rboyer/safeio v0.2.1 github.com/ryanuber/columnize v2.1.2+incompatible github.com/shirou/gopsutil/v3 v3.22.8 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.3 go.etcd.io/bbolt v1.3.6 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/metric v1.16.0 + go.opentelemetry.io/otel/sdk v1.16.0 + go.opentelemetry.io/otel/sdk/metric v0.39.0 + go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/goleak v1.1.10 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d golang.org/x/net v0.8.0 golang.org/x/oauth2 v0.6.0 golang.org/x/sync v0.1.0 - golang.org/x/sys v0.6.0 + golang.org/x/sys v0.8.0 golang.org/x/time v0.3.0 google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737 google.golang.org/grpc v1.49.0 @@ -147,7 +153,7 @@ require ( github.com/dimchansky/utfbom v1.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/analysis v0.21.4 // indirect @@ -167,11 +173,11 @@ require ( github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/googleapis/gnostic v0.2.0 // indirect github.com/gophercloud/gophercloud v0.3.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect github.com/hashicorp/go-plugin v1.4.5 // indirect - github.com/hashicorp/go-retryablehttp v0.6.7 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect @@ -225,9 +231,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.mongodb.org/mongo-driver v1.11.0 // indirect go.opencensus.io v0.23.0 // indirect - go.opentelemetry.io/otel v1.11.1 // indirect - go.opentelemetry.io/otel/trace v1.11.1 // indirect - go.opentelemetry.io/proto/otlp v0.7.0 // indirect + go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect diff --git a/go.sum b/go.sum index 127c1350bf..59ebf57327 100644 --- a/go.sum +++ b/go.sum @@ -191,8 +191,10 @@ github.com/cloudflare/cloudflare-go v0.10.2/go.mod h1:qhVI5MKwBGhdNU89ZRz2plgYut github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -309,8 +311,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -394,6 +396,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -515,6 +519,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 h1:1ZEjnveDe20yFa6lSkfdQZm5BR/b271n0MsB5R2L3us= @@ -604,8 +610,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ= github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo= -github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3 h1:9QstZdsLIS6iPyYxQoyymRz8nBw9jMdEbGy29gtgzVQ= -github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= +github.com/hashicorp/hcp-sdk-go v0.48.0 h1:LWpFR7YVDz4uG4C/ixcy2tRbg7/BgjMcTh1bRkKaeBQ= +github.com/hashicorp/hcp-sdk-go v0.48.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -1005,8 +1011,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tencentcloud/tencentcloud-sdk-go v1.0.162 h1:8fDzz4GuVg4skjY2B0nMN7h6uN61EDVkuLyI2+qGHhI= github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= @@ -1073,13 +1080,19 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4= -go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE= -go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= -go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= -go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk= -go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= +go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= +go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -1230,6 +1243,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= @@ -1340,8 +1354,8 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= @@ -1537,6 +1551,7 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737 h1:K1zaaMdYBXRyX+cwFnxj7M6zwDyumLQMZ5xqwGvjreQ= google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737/go.mod h1:2r/26NEF3bFmT3eC3aZreahSal0C3Shl8Gi6vyDYqOQ= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -1570,6 +1585,7 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= +google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= diff --git a/lib/telemetry.go b/lib/telemetry.go index f6af973924..2d87707c33 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -324,7 +324,7 @@ func circonusSink(cfg TelemetryConfig, _ string) (metrics.MetricSink, error) { return sink, nil } -func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.FanoutSink, error) { +func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink, extraSinks []metrics.MetricSink) (metrics.FanoutSink, error) { metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) metricsConf.EnableHostname = !cfg.DisableHostname metricsConf.FilterDefault = cfg.FilterDefault @@ -349,6 +349,11 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa addSink(dogstatdSink) addSink(circonusSink) addSink(prometheusSink) + for _, sink := range extraSinks { + if sink != nil { + sinks = append(sinks, sink) + } + } if len(sinks) > 0 { sinks = append(sinks, memSink) @@ -364,7 +369,7 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa // values as returned by Runtimecfg.Config(). // InitTelemetry retries configurating the sinks in case error is retriable // and retry_failed_connection is set to true. -func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) { +func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger, extraSinks ...metrics.MetricSink) (*MetricsConfig, error) { if cfg.Disable { return nil, nil } @@ -384,7 +389,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er } for { logger.Warn("retrying configure metric sinks", "retries", waiter.Failures()) - _, err := configureSinks(cfg, memSink) + _, err := configureSinks(cfg, memSink, extraSinks) if err == nil { logger.Info("successfully configured metrics sinks") return @@ -397,7 +402,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er } } - if _, errs := configureSinks(cfg, memSink); errs != nil { + if _, errs := configureSinks(cfg, memSink, extraSinks); errs != nil { if isRetriableError(errs) && cfg.RetryFailedConfiguration { logger.Warn("failed configure sinks", "error", multierror.Flatten(errs)) ctx, cancel = context.WithCancel(context.Background()) diff --git a/lib/telemetry_test.go b/lib/telemetry_test.go index c8649f0fd7..a2c0075598 100644 --- a/lib/telemetry_test.go +++ b/lib/telemetry_test.go @@ -10,6 +10,8 @@ import ( "testing" "github.com/hashicorp/consul/logging" + + "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" "github.com/stretchr/testify/require" ) @@ -24,15 +26,16 @@ func newCfg() TelemetryConfig { func TestConfigureSinks(t *testing.T) { cfg := newCfg() - sinks, err := configureSinks(cfg, nil) + extraSinks := []metrics.MetricSink{&metrics.BlackholeSink{}} + sinks, err := configureSinks(cfg, nil, extraSinks) require.Error(t, err) - // 3 sinks: statsd, statsite, inmem - require.Equal(t, 3, len(sinks)) + // 4 sinks: statsd, statsite, inmem, extra sink (blackhole) + require.Equal(t, 4, len(sinks)) cfg = TelemetryConfig{ DogstatsdAddr: "", } - _, err = configureSinks(cfg, nil) + _, err = configureSinks(cfg, nil, nil) require.NoError(t, err) }