mirror of https://github.com/hashicorp/consul
Browse Source
* Move hcp client to subpackage hcpclient (#16800) * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] OTELExporter (#17128) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * [HCP Observability] OTELSink (#17159) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Fix nits * [HCP Observability] Init OTELSink in Telemetry (#17162) * Move hcp client to subpackage hcpclient (#16800) * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * [HCP Observability] OTELExporter (#17128) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Initialize OTELSink with sync.Map for all the instrument stores. * Added telemetry agent to client and init sink in deps * Fixed client * Initalize sink in deps * init sink in telemetry library * Init deps before telemetry * Use concrete telemetry.OtelSink type * add /v1/metrics * Avoid returning err for telemetry init * move sink init within the IsCloudEnabled() * Use HCPSinkOpts in deps instead * update golden test for configuration file * Switch to using extra sinks in the telemetry library * keep name MetricsConfig * fix log in verifyCCMRegistration * Set logger in context * pass around MetricSink in deps * Fix imports * Rebased onto otel sink pr * Fix URL in test * [HCP Observability] OTELSink (#17159) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Fix nits * pass extraSinks as function param instead * Add default interval as package export * remove verifyCCM func * Add clusterID * Fix import and add t.Parallel() for missing tests * Kick Vercel CI * Remove scheme from endpoint path, and fix error logging * return metrics.MetricSink for sink method * Update SDK * [HCP Observability] Metrics filtering and Labels in Go Metrics sink (#17184) * Move hcp client to subpackage hcpclient (#16800) * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * [HCP Observability] New MetricsClient (#17100) * Client configured with TLS using HCP config and retry/throttle * Add tests and godoc for metrics client * close body after request * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * remove clone * Extract CloudConfig and mock for future PR * Switch to hclog.FromContext * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * [HCP Observability] OTELExporter (#17128) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Initialize OTELSink with sync.Map for all the instrument stores. * Added telemetry agent to client and init sink in deps * Fixed client * Initalize sink in deps * init sink in telemetry library * Init deps before telemetry * Use concrete telemetry.OtelSink type * add /v1/metrics * Avoid returning err for telemetry init * move sink init within the IsCloudEnabled() * Use HCPSinkOpts in deps instead * update golden test for configuration file * Switch to using extra sinks in the telemetry library * keep name MetricsConfig * fix log in verifyCCMRegistration * Set logger in context * pass around MetricSink in deps * Fix imports * Rebased onto otel sink pr * Fix URL in test * [HCP Observability] OTELSink (#17159) * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Initialize OTELSink with sync.Map for all the instrument stores. * Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests. * Switch to mutex instead of sync.Map to avoid type assertion * Add gauge store * Clarify comments * return concrete sink type * Fix lint errors * Move gauge store to be within sink * Use context.TODO,rebase and clenaup opts handling * Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1 * Fix imports * Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx * Add lots of documentation to the OTELSink * Fix gauge store comment and check ok * Add select and ctx.Done() check to gauge callback * use require.Equal for attributes * Fixed import naming * Remove float64 calls and add a NewGaugeStore method * Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store * Generate 100 gauge operations * Seperate the labels into goroutines in sink test * Generate kv store for the test case keys to avoid using uuid * Added a race test with 300 samples for OTELSink * Do not pass in waitgroup and use error channel instead. * Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel * Fix nits * pass extraSinks as function param instead * Add default interval as package export * remove verifyCCM func * Add clusterID * Fix import and add t.Parallel() for missing tests * Kick Vercel CI * Remove scheme from endpoint path, and fix error logging * return metrics.MetricSink for sink method * Update SDK * Added telemetry agent to client and init sink in deps * Add node_id and __replica__ default labels * add function for default labels and set x-hcp-resource-id * Fix labels tests * Commit suggestion for getDefaultLabels Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com> * Fixed server.id, and t.Parallel() * Make defaultLabels a method on the TelemetryConfig object * Rename FilterList to lowercase filterList * Cleanup filter implemetation by combining regex into a single one, and making the type lowercase * Fix append * use regex directly for filters * Fix x-resource-id test to use mocked value * Fix log.Error formats * Forgot the len(opts.Label) optimization) * Use cfg.NodeID instead --------- Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com> * remove replic tag (#17484) * [HCP Observability] Add custom metrics for OTEL sink, improve logging, upgrade modules and cleanup metrics client (#17455) * Add custom metrics for Exporter and transform operations * Improve deps logging Run go mod tidy * Upgrade SDK and OTEL * Remove the partial success implemetation and check for HTTP status code in metrics client * Add x-channel * cleanup logs in deps.go based on PR feedback * Change to debug log and lowercase * address test operation feedback * use GetHumanVersion on version * Fix error wrapping * Fix metric names * [HCP Observability] Turn off retries for now until dynamically configurable (#17496) * Remove retries for now until dynamic configuration is possible * Clarify comment * Update changelog * improve changelog --------- Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com>pull/17505/head
Ashvitha
2 years ago
committed by
GitHub
35 changed files with 2602 additions and 78 deletions
@ -0,0 +1,3 @@
|
||||
```release-note:feature |
||||
hcp: Add new metrics sink to collect, aggregate and export server metrics to HCP in OTEL format. |
||||
``` |
@ -0,0 +1,75 @@
|
||||
package client |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/mock" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestFetchTelemetryConfig(t *testing.T) { |
||||
t.Parallel() |
||||
for name, test := range map[string]struct { |
||||
metricsEndpoint string |
||||
expect func(*MockClient) |
||||
disabled bool |
||||
}{ |
||||
"success": { |
||||
expect: func(mockClient *MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ |
||||
Endpoint: "https://test.com", |
||||
MetricsConfig: &MetricsConfig{ |
||||
Endpoint: "", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
metricsEndpoint: "https://test.com/v1/metrics", |
||||
}, |
||||
"overrideMetricsEndpoint": { |
||||
expect: func(mockClient *MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ |
||||
Endpoint: "https://test.com", |
||||
MetricsConfig: &MetricsConfig{ |
||||
Endpoint: "https://test.com", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
metricsEndpoint: "https://test.com/v1/metrics", |
||||
}, |
||||
"disabledWithEmptyEndpoint": { |
||||
expect: func(mockClient *MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{ |
||||
Endpoint: "", |
||||
MetricsConfig: &MetricsConfig{ |
||||
Endpoint: "", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
disabled: true, |
||||
}, |
||||
} { |
||||
test := test |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
mock := NewMockClient(t) |
||||
test.expect(mock) |
||||
|
||||
telemetryCfg, err := mock.FetchTelemetryConfig(context.Background()) |
||||
require.NoError(t, err) |
||||
|
||||
if test.disabled { |
||||
endpoint, ok := telemetryCfg.Enabled() |
||||
require.False(t, ok) |
||||
require.Empty(t, endpoint) |
||||
return |
||||
} |
||||
|
||||
endpoint, ok := telemetryCfg.Enabled() |
||||
|
||||
require.True(t, ok) |
||||
require.Equal(t, test.metricsEndpoint, endpoint) |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,157 @@
|
||||
package client |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"time" |
||||
|
||||
"github.com/hashicorp/go-cleanhttp" |
||||
"github.com/hashicorp/go-hclog" |
||||
"github.com/hashicorp/go-retryablehttp" |
||||
hcpcfg "github.com/hashicorp/hcp-sdk-go/config" |
||||
"github.com/hashicorp/hcp-sdk-go/resource" |
||||
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" |
||||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" |
||||
"golang.org/x/oauth2" |
||||
"google.golang.org/protobuf/proto" |
||||
|
||||
"github.com/hashicorp/consul/version" |
||||
) |
||||
|
||||
const ( |
||||
// HTTP Client config
|
||||
defaultStreamTimeout = 15 * time.Second |
||||
|
||||
// Retry config
|
||||
// TODO: Eventually, we'd like to configure these values dynamically.
|
||||
defaultRetryWaitMin = 1 * time.Second |
||||
defaultRetryWaitMax = 15 * time.Second |
||||
// defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible.
|
||||
// This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now.
|
||||
defaultRetryMax = 0 |
||||
) |
||||
|
||||
// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
|
||||
type MetricsClient interface { |
||||
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error |
||||
} |
||||
|
||||
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
|
||||
type CloudConfig interface { |
||||
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) |
||||
Resource() (resource.Resource, error) |
||||
} |
||||
|
||||
// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
|
||||
// It also holds default HTTP headers to add to export requests.
|
||||
type otlpClient struct { |
||||
client *retryablehttp.Client |
||||
header *http.Header |
||||
} |
||||
|
||||
// NewMetricsClient returns a configured MetricsClient.
|
||||
// The current implementation uses otlpClient to provide retry functionality.
|
||||
func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) { |
||||
if cfg == nil { |
||||
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") |
||||
} |
||||
|
||||
if ctx == nil { |
||||
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context") |
||||
} |
||||
|
||||
logger := hclog.FromContext(ctx) |
||||
|
||||
c, err := newHTTPClient(cfg, logger) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to init telemetry client: %v", err) |
||||
} |
||||
|
||||
r, err := cfg.Resource() |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to init telemetry client: %v", err) |
||||
} |
||||
|
||||
header := make(http.Header) |
||||
header.Set("content-type", "application/x-protobuf") |
||||
header.Set("x-hcp-resource-id", r.String()) |
||||
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) |
||||
|
||||
return &otlpClient{ |
||||
client: c, |
||||
header: &header, |
||||
}, nil |
||||
} |
||||
|
||||
// newHTTPClient configures the retryable HTTP client.
|
||||
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { |
||||
hcpCfg, err := cloudCfg.HCPConfig() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
tlsTransport := cleanhttp.DefaultPooledTransport() |
||||
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig() |
||||
|
||||
var transport http.RoundTripper = &oauth2.Transport{ |
||||
Base: tlsTransport, |
||||
Source: hcpCfg, |
||||
} |
||||
|
||||
client := &http.Client{ |
||||
Transport: transport, |
||||
Timeout: defaultStreamTimeout, |
||||
} |
||||
|
||||
retryClient := &retryablehttp.Client{ |
||||
HTTPClient: client, |
||||
Logger: logger.Named("hcp_telemetry_client"), |
||||
RetryWaitMin: defaultRetryWaitMin, |
||||
RetryWaitMax: defaultRetryWaitMax, |
||||
RetryMax: defaultRetryMax, |
||||
CheckRetry: retryablehttp.DefaultRetryPolicy, |
||||
Backoff: retryablehttp.DefaultBackoff, |
||||
} |
||||
|
||||
return retryClient, nil |
||||
} |
||||
|
||||
// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
|
||||
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
|
||||
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
|
||||
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { |
||||
pbRequest := &colmetricpb.ExportMetricsServiceRequest{ |
||||
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, |
||||
} |
||||
|
||||
body, err := proto.Marshal(pbRequest) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to marshal the request: %w", err) |
||||
} |
||||
|
||||
req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to create request: %w", err) |
||||
} |
||||
req.Header = *o.header |
||||
|
||||
resp, err := o.client.Do(req.WithContext(ctx)) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to post metrics: %w", err) |
||||
} |
||||
defer resp.Body.Close() |
||||
|
||||
var respData bytes.Buffer |
||||
if _, err := io.Copy(&respData, resp.Body); err != nil { |
||||
return fmt.Errorf("failed to read body: %w", err) |
||||
} |
||||
|
||||
if resp.StatusCode != http.StatusOK { |
||||
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body)) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,114 @@
|
||||
package client |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" |
||||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" |
||||
"google.golang.org/protobuf/proto" |
||||
|
||||
"github.com/hashicorp/consul/version" |
||||
) |
||||
|
||||
func TestNewMetricsClient(t *testing.T) { |
||||
for name, test := range map[string]struct { |
||||
wantErr string |
||||
cfg CloudConfig |
||||
ctx context.Context |
||||
}{ |
||||
"success": { |
||||
cfg: &MockCloudCfg{}, |
||||
ctx: context.Background(), |
||||
}, |
||||
"failsWithoutCloudCfg": { |
||||
wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)", |
||||
cfg: nil, |
||||
ctx: context.Background(), |
||||
}, |
||||
"failsWithoutContext": { |
||||
wantErr: "failed to init telemetry client: provide a valid context", |
||||
cfg: MockCloudCfg{}, |
||||
ctx: nil, |
||||
}, |
||||
"failsHCPConfig": { |
||||
wantErr: "failed to init telemetry client", |
||||
cfg: MockCloudCfg{ |
||||
ConfigErr: fmt.Errorf("test bad hcp config"), |
||||
}, |
||||
ctx: context.Background(), |
||||
}, |
||||
"failsBadResource": { |
||||
wantErr: "failed to init telemetry client", |
||||
cfg: MockCloudCfg{ |
||||
ResourceErr: fmt.Errorf("test bad resource"), |
||||
}, |
||||
ctx: context.Background(), |
||||
}, |
||||
} { |
||||
t.Run(name, func(t *testing.T) { |
||||
client, err := NewMetricsClient(test.cfg, test.ctx) |
||||
if test.wantErr != "" { |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), test.wantErr) |
||||
return |
||||
} |
||||
|
||||
require.Nil(t, err) |
||||
require.NotNil(t, client) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestExportMetrics(t *testing.T) { |
||||
for name, test := range map[string]struct { |
||||
wantErr string |
||||
status int |
||||
}{ |
||||
"success": { |
||||
status: http.StatusOK, |
||||
}, |
||||
"failsWithNonRetryableError": { |
||||
status: http.StatusBadRequest, |
||||
wantErr: "failed to export metrics: code 400", |
||||
}, |
||||
} { |
||||
t.Run(name, func(t *testing.T) { |
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf") |
||||
require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) |
||||
require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion())) |
||||
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") |
||||
|
||||
body := colpb.ExportMetricsServiceResponse{} |
||||
bytes, err := proto.Marshal(&body) |
||||
|
||||
require.NoError(t, err) |
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf") |
||||
w.WriteHeader(test.status) |
||||
w.Write(bytes) |
||||
})) |
||||
defer srv.Close() |
||||
|
||||
client, err := NewMetricsClient(MockCloudCfg{}, context.Background()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
metrics := &metricpb.ResourceMetrics{} |
||||
err = client.ExportMetrics(ctx, metrics, srv.URL) |
||||
|
||||
if test.wantErr != "" { |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), test.wantErr) |
||||
return |
||||
} |
||||
|
||||
require.NoError(t, err) |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,47 @@
|
||||
package client |
||||
|
||||
import ( |
||||
"crypto/tls" |
||||
"net/url" |
||||
|
||||
hcpcfg "github.com/hashicorp/hcp-sdk-go/config" |
||||
"github.com/hashicorp/hcp-sdk-go/profile" |
||||
"github.com/hashicorp/hcp-sdk-go/resource" |
||||
"golang.org/x/oauth2" |
||||
) |
||||
|
||||
const testResourceID = "organization/test-org/project/test-project/test-type/test-id" |
||||
|
||||
type mockHCPCfg struct{} |
||||
|
||||
func (m *mockHCPCfg) Token() (*oauth2.Token, error) { |
||||
return &oauth2.Token{ |
||||
AccessToken: "test-token", |
||||
}, nil |
||||
} |
||||
|
||||
func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil } |
||||
func (m *mockHCPCfg) SCADAAddress() string { return "" } |
||||
func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} } |
||||
func (m *mockHCPCfg) APIAddress() string { return "" } |
||||
func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} } |
||||
func (m *mockHCPCfg) Profile() *profile.UserProfile { return nil } |
||||
|
||||
type MockCloudCfg struct { |
||||
ConfigErr error |
||||
ResourceErr error |
||||
} |
||||
|
||||
func (m MockCloudCfg) Resource() (resource.Resource, error) { |
||||
r := resource.Resource{ |
||||
ID: "test-id", |
||||
Type: "test-type", |
||||
Organization: "test-org", |
||||
Project: "test-project", |
||||
} |
||||
return r, m.ResourceErr |
||||
} |
||||
|
||||
func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { |
||||
return &mockHCPCfg{}, m.ConfigErr |
||||
} |
@ -0,0 +1,106 @@
|
||||
package hcp |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
|
||||
"github.com/hashicorp/go-hclog" |
||||
"github.com/stretchr/testify/mock" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/client" |
||||
"github.com/hashicorp/consul/types" |
||||
) |
||||
|
||||
func TestSink(t *testing.T) { |
||||
t.Parallel() |
||||
for name, test := range map[string]struct { |
||||
expect func(*client.MockClient) |
||||
mockCloudCfg client.CloudConfig |
||||
expectedSink bool |
||||
}{ |
||||
"success": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ |
||||
Endpoint: "https://test.com", |
||||
MetricsConfig: &client.MetricsConfig{ |
||||
Endpoint: "https://test.com", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
mockCloudCfg: client.MockCloudCfg{}, |
||||
expectedSink: true, |
||||
}, |
||||
"noSinkWhenServerNotRegisteredWithCCM": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ |
||||
Endpoint: "", |
||||
MetricsConfig: &client.MetricsConfig{ |
||||
Endpoint: "", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
mockCloudCfg: client.MockCloudCfg{}, |
||||
}, |
||||
"noSinkWhenCCMVerificationFails": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed")) |
||||
}, |
||||
mockCloudCfg: client.MockCloudCfg{}, |
||||
}, |
||||
"noSinkWhenMetricsClientInitFails": { |
||||
mockCloudCfg: client.MockCloudCfg{ |
||||
ConfigErr: fmt.Errorf("test bad hcp config"), |
||||
}, |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ |
||||
Endpoint: "https://test.com", |
||||
MetricsConfig: &client.MetricsConfig{ |
||||
Endpoint: "", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
}, |
||||
"failsWithFetchTelemetryFailure": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) |
||||
}, |
||||
}, |
||||
"failsWithURLParseErr": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ |
||||
// Minimum 2 chars for a domain to be valid.
|
||||
Endpoint: "s", |
||||
MetricsConfig: &client.MetricsConfig{ |
||||
// Invalid domain chars
|
||||
Endpoint: " ", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
}, |
||||
"noErrWithEmptyEndpoint": { |
||||
expect: func(mockClient *client.MockClient) { |
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ |
||||
Endpoint: "", |
||||
MetricsConfig: &client.MetricsConfig{ |
||||
Endpoint: "", |
||||
}, |
||||
}, nil) |
||||
}, |
||||
}, |
||||
} { |
||||
test := test |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
c := client.NewMockClient(t) |
||||
l := hclog.NewNullLogger() |
||||
test.expect(c) |
||||
sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234")) |
||||
if !test.expectedSink { |
||||
require.Nil(t, sinkOpts) |
||||
return |
||||
} |
||||
require.NotNil(t, sinkOpts) |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,14 @@
|
||||
package telemetry |
||||
|
||||
// Keys for custom Go Metrics metrics emitted only for the OTEL
|
||||
// export (exporter.go) and transform (transform.go) failures and successes.
|
||||
// These enable us to monitor OTEL operations.
|
||||
var ( |
||||
internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"} |
||||
|
||||
internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"} |
||||
internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"} |
||||
|
||||
internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"} |
||||
internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"} |
||||
) |
@ -0,0 +1,12 @@
|
||||
// Package telemetry implements functionality to collect, aggregate, convert and export
|
||||
// telemetry data in OpenTelemetry Protocol (OTLP) format.
|
||||
//
|
||||
// The entrypoint is the OpenTelemetry (OTEL) go-metrics sink which:
|
||||
// - Receives metric data.
|
||||
// - Aggregates metric data using the OTEL Go Metrics SDK.
|
||||
// - Exports metric data using a configurable OTEL exporter.
|
||||
//
|
||||
// The package also provides an OTEL exporter implementation to be used within the sink, which:
|
||||
// - Transforms metric data from the Metrics SDK OTEL representation to OTLP format.
|
||||
// - Exports OTLP metric data to an external endpoint using a configurable client.
|
||||
package telemetry |
@ -0,0 +1,37 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"fmt" |
||||
"regexp" |
||||
"strings" |
||||
|
||||
"github.com/hashicorp/go-multierror" |
||||
) |
||||
|
||||
// newFilterRegex returns a valid regex used to filter metrics.
|
||||
// It will fail if there are 0 valid regex filters given.
|
||||
func newFilterRegex(filters []string) (*regexp.Regexp, error) { |
||||
var mErr error |
||||
validFilters := make([]string, 0, len(filters)) |
||||
for _, filter := range filters { |
||||
_, err := regexp.Compile(filter) |
||||
if err != nil { |
||||
mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err)) |
||||
continue |
||||
} |
||||
validFilters = append(validFilters, filter) |
||||
} |
||||
|
||||
if len(validFilters) == 0 { |
||||
return nil, multierror.Append(mErr, fmt.Errorf("no valid filters")) |
||||
} |
||||
|
||||
// Combine the valid regex strings with an OR.
|
||||
finalRegex := strings.Join(validFilters, "|") |
||||
composedRegex, err := regexp.Compile(finalRegex) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to compile regex: %w", err) |
||||
} |
||||
|
||||
return composedRegex, nil |
||||
} |
@ -0,0 +1,58 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestFilter(t *testing.T) { |
||||
t.Parallel() |
||||
for name, tc := range map[string]struct { |
||||
filters []string |
||||
expectedRegexString string |
||||
matches []string |
||||
wantErr string |
||||
wantMatch bool |
||||
}{ |
||||
"badFilterRegex": { |
||||
filters: []string{"(*LF)"}, |
||||
wantErr: "no valid filters", |
||||
}, |
||||
"failsWithNoRegex": { |
||||
filters: []string{}, |
||||
wantErr: "no valid filters", |
||||
}, |
||||
"matchFound": { |
||||
filters: []string{"raft.*", "mem.*"}, |
||||
expectedRegexString: "raft.*|mem.*", |
||||
matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, |
||||
wantMatch: true, |
||||
}, |
||||
"matchNotFound": { |
||||
filters: []string{"mem.*"}, |
||||
matches: []string{"consul.raft.peers", "consul.txn.apply"}, |
||||
expectedRegexString: "mem.*", |
||||
wantMatch: false, |
||||
}, |
||||
} { |
||||
tc := tc |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
f, err := newFilterRegex(tc.filters) |
||||
|
||||
if tc.wantErr != "" { |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), tc.wantErr) |
||||
return |
||||
} |
||||
|
||||
require.NoError(t, err) |
||||
require.Equal(t, tc.expectedRegexString, f.String()) |
||||
for _, metric := range tc.matches { |
||||
m := f.MatchString(metric) |
||||
require.Equal(t, tc.wantMatch, m) |
||||
} |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,77 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"context" |
||||
"sync" |
||||
|
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/metric" |
||||
) |
||||
|
||||
// gaugeStore holds last seen Gauge values for a particular metric (<name,last_value>) in the store.
|
||||
// OTEL does not currently have a synchronous Gauge instrument. Instead, it allows the registration of callbacks.
|
||||
// The callbacks are called during export, where the Gauge value must be returned.
|
||||
// This store is a workaround, which holds last seen Gauge values until the callback is called.
|
||||
type gaugeStore struct { |
||||
store map[string]*gaugeValue |
||||
mutex sync.Mutex |
||||
} |
||||
|
||||
// gaugeValues are the last seen measurement for a Gauge metric, which contains a float64 value and labels.
|
||||
type gaugeValue struct { |
||||
Value float64 |
||||
Attributes []attribute.KeyValue |
||||
} |
||||
|
||||
// NewGaugeStore returns an initialized empty gaugeStore.
|
||||
func NewGaugeStore() *gaugeStore { |
||||
return &gaugeStore{ |
||||
store: make(map[string]*gaugeValue, 0), |
||||
} |
||||
} |
||||
|
||||
// LoadAndDelete will read a Gauge value and delete it.
|
||||
// Once registered for a metric name, a Gauge callback will continue to execute every collection cycel.
|
||||
// We must delete the value once we have read it, to avoid repeat values being sent.
|
||||
func (g *gaugeStore) LoadAndDelete(key string) (*gaugeValue, bool) { |
||||
g.mutex.Lock() |
||||
defer g.mutex.Unlock() |
||||
|
||||
gauge, ok := g.store[key] |
||||
if !ok { |
||||
return nil, ok |
||||
} |
||||
|
||||
delete(g.store, key) |
||||
|
||||
return gauge, ok |
||||
} |
||||
|
||||
// Set adds a gaugeValue to the global gauge store.
|
||||
func (g *gaugeStore) Set(key string, value float64, labels []attribute.KeyValue) { |
||||
g.mutex.Lock() |
||||
defer g.mutex.Unlock() |
||||
|
||||
gv := &gaugeValue{ |
||||
Value: value, |
||||
Attributes: labels, |
||||
} |
||||
|
||||
g.store[key] = gv |
||||
} |
||||
|
||||
// gaugeCallback returns a callback which gets called when metrics are collected for export.
|
||||
func (g *gaugeStore) gaugeCallback(key string) metric.Float64Callback { |
||||
// Closures keep a reference to the key string, that get garbage collected when code completes.
|
||||
return func(ctx context.Context, obs metric.Float64Observer) error { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
default: |
||||
if gauge, ok := g.LoadAndDelete(key); ok { |
||||
obs.Observe(gauge.Value, metric.WithAttributes(gauge.Attributes...)) |
||||
} |
||||
return nil |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,89 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sync" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
) |
||||
|
||||
func TestGaugeStore(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
gaugeStore := NewGaugeStore() |
||||
|
||||
attributes := []attribute.KeyValue{ |
||||
{ |
||||
Key: attribute.Key("test_key"), |
||||
Value: attribute.StringValue("test_value"), |
||||
}, |
||||
} |
||||
|
||||
gaugeStore.Set("test", 1.23, attributes) |
||||
|
||||
// Should store a new gauge.
|
||||
val, ok := gaugeStore.LoadAndDelete("test") |
||||
require.True(t, ok) |
||||
require.Equal(t, val.Value, 1.23) |
||||
require.Equal(t, val.Attributes, attributes) |
||||
|
||||
// Gauge with key "test" have been deleted.
|
||||
val, ok = gaugeStore.LoadAndDelete("test") |
||||
require.False(t, ok) |
||||
require.Nil(t, val) |
||||
|
||||
gaugeStore.Set("duplicate", 1.5, nil) |
||||
gaugeStore.Set("duplicate", 6.7, nil) |
||||
|
||||
// Gauge with key "duplicate" should hold the latest (last seen) value.
|
||||
val, ok = gaugeStore.LoadAndDelete("duplicate") |
||||
require.True(t, ok) |
||||
require.Equal(t, val.Value, 6.7) |
||||
} |
||||
|
||||
func TestGaugeCallback_Failure(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
k := "consul.raft.apply" |
||||
gaugeStore := NewGaugeStore() |
||||
gaugeStore.Set(k, 1.23, nil) |
||||
|
||||
cb := gaugeStore.gaugeCallback(k) |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
|
||||
cancel() |
||||
err := cb(ctx, nil) |
||||
require.ErrorIs(t, err, context.Canceled) |
||||
} |
||||
|
||||
// TestGaugeStore_Race induces a race condition. When run with go test -race,
|
||||
// this test should pass if implementation is concurrency safe.
|
||||
func TestGaugeStore_Race(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
gaugeStore := NewGaugeStore() |
||||
|
||||
wg := &sync.WaitGroup{} |
||||
samples := 100 |
||||
errCh := make(chan error, samples) |
||||
for i := 0; i < samples; i++ { |
||||
wg.Add(1) |
||||
key := fmt.Sprintf("consul.test.%d", i) |
||||
value := 12.34 |
||||
go func() { |
||||
defer wg.Done() |
||||
gaugeStore.Set(key, value, nil) |
||||
gv, _ := gaugeStore.LoadAndDelete(key) |
||||
if gv.Value != value { |
||||
errCh <- fmt.Errorf("expected value: '%f', but got: '%f' for key: '%s'", value, gv.Value, key) |
||||
} |
||||
}() |
||||
} |
||||
|
||||
wg.Wait() |
||||
|
||||
require.Empty(t, errCh) |
||||
} |
@ -0,0 +1,81 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/url" |
||||
|
||||
goMetrics "github.com/armon/go-metrics" |
||||
"go.opentelemetry.io/otel/sdk/metric" |
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation" |
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata" |
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client" |
||||
) |
||||
|
||||
// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
|
||||
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
|
||||
// This allows us to use a custom client - HCP authenticated MetricsClient.
|
||||
type OTELExporter struct { |
||||
client hcpclient.MetricsClient |
||||
endpoint *url.URL |
||||
} |
||||
|
||||
// NewOTELExporter returns a configured OTELExporter
|
||||
func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter { |
||||
return &OTELExporter{ |
||||
client: client, |
||||
endpoint: endpoint, |
||||
} |
||||
} |
||||
|
||||
// Temporality returns the Cumulative temporality for metrics aggregation.
|
||||
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
|
||||
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality { |
||||
return metricdata.CumulativeTemporality |
||||
} |
||||
|
||||
// Aggregation returns the Aggregation to use for an instrument kind.
|
||||
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
|
||||
// This custom version replicates that logic, but removes the panic.
|
||||
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation { |
||||
switch kind { |
||||
case metric.InstrumentKindObservableGauge: |
||||
return aggregation.LastValue{} |
||||
case metric.InstrumentKindHistogram: |
||||
return aggregation.ExplicitBucketHistogram{ |
||||
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, |
||||
NoMinMax: false, |
||||
} |
||||
} |
||||
// for metric.InstrumentKindCounter and others, default to sum.
|
||||
return aggregation.Sum{} |
||||
} |
||||
|
||||
// Export serializes and transmits metric data to a receiver.
|
||||
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error { |
||||
otlpMetrics := transformOTLP(metrics) |
||||
if isEmpty(otlpMetrics) { |
||||
return nil |
||||
} |
||||
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) |
||||
if err != nil { |
||||
goMetrics.IncrCounter(internalMetricExportFailure, 1) |
||||
return fmt.Errorf("failed to export metrics: %w", err) |
||||
} |
||||
|
||||
goMetrics.IncrCounter(internalMetricExportSuccess, 1) |
||||
return nil |
||||
} |
||||
|
||||
// ForceFlush is a no-op, as the MetricsClient client holds no state.
|
||||
func (e *OTELExporter) ForceFlush(ctx context.Context) error { |
||||
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1) |
||||
return ctx.Err() |
||||
} |
||||
|
||||
// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
|
||||
func (e *OTELExporter) Shutdown(ctx context.Context) error { |
||||
goMetrics.IncrCounter(internalMetricExporterShutdown, 1) |
||||
return ctx.Err() |
||||
} |
@ -0,0 +1,208 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/url" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/armon/go-metrics" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel/sdk/metric" |
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation" |
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" |
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/client" |
||||
) |
||||
|
||||
type mockMetricsClient struct { |
||||
exportErr error |
||||
} |
||||
|
||||
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { |
||||
return m.exportErr |
||||
} |
||||
|
||||
func TestTemporality(t *testing.T) { |
||||
t.Parallel() |
||||
exp := &OTELExporter{} |
||||
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter)) |
||||
} |
||||
|
||||
func TestAggregation(t *testing.T) { |
||||
t.Parallel() |
||||
for name, test := range map[string]struct { |
||||
kind metric.InstrumentKind |
||||
expAgg aggregation.Aggregation |
||||
}{ |
||||
"gauge": { |
||||
kind: metric.InstrumentKindObservableGauge, |
||||
expAgg: aggregation.LastValue{}, |
||||
}, |
||||
"counter": { |
||||
kind: metric.InstrumentKindCounter, |
||||
expAgg: aggregation.Sum{}, |
||||
}, |
||||
"histogram": { |
||||
kind: metric.InstrumentKindHistogram, |
||||
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false}, |
||||
}, |
||||
} { |
||||
test := test |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
exp := &OTELExporter{} |
||||
require.Equal(t, test.expAgg, exp.Aggregation(test.kind)) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestExport(t *testing.T) { |
||||
t.Parallel() |
||||
for name, test := range map[string]struct { |
||||
wantErr string |
||||
metrics *metricdata.ResourceMetrics |
||||
client client.MetricsClient |
||||
}{ |
||||
"earlyReturnWithoutScopeMetrics": { |
||||
client: &mockMetricsClient{}, |
||||
metrics: mutateMetrics(nil), |
||||
}, |
||||
"earlyReturnWithoutMetrics": { |
||||
client: &mockMetricsClient{}, |
||||
metrics: mutateMetrics([]metricdata.ScopeMetrics{ |
||||
{Metrics: []metricdata.Metrics{}}, |
||||
}, |
||||
), |
||||
}, |
||||
"errorWithExportFailure": { |
||||
client: &mockMetricsClient{ |
||||
exportErr: fmt.Errorf("failed to export metrics."), |
||||
}, |
||||
metrics: mutateMetrics([]metricdata.ScopeMetrics{ |
||||
{ |
||||
Metrics: []metricdata.Metrics{ |
||||
{ |
||||
Name: "consul.raft.commitTime", |
||||
Data: metricdata.Gauge[float64]{}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
), |
||||
wantErr: "failed to export metrics", |
||||
}, |
||||
} { |
||||
test := test |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
exp := NewOTELExporter(test.client, &url.URL{}) |
||||
|
||||
err := exp.Export(context.Background(), test.metrics) |
||||
if test.wantErr != "" { |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), test.wantErr) |
||||
return |
||||
} |
||||
|
||||
require.NoError(t, err) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
|
||||
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
|
||||
// sets a shared global sink.
|
||||
func TestExport_CustomMetrics(t *testing.T) { |
||||
for name, tc := range map[string]struct { |
||||
client client.MetricsClient |
||||
metricKey []string |
||||
operation string |
||||
}{ |
||||
"exportSuccessEmitsCustomMetric": { |
||||
client: &mockMetricsClient{}, |
||||
metricKey: internalMetricExportSuccess, |
||||
operation: "export", |
||||
}, |
||||
"exportFailureEmitsCustomMetric": { |
||||
client: &mockMetricsClient{ |
||||
exportErr: fmt.Errorf("client err"), |
||||
}, |
||||
metricKey: internalMetricExportFailure, |
||||
operation: "export", |
||||
}, |
||||
"shutdownEmitsCustomMetric": { |
||||
metricKey: internalMetricExporterShutdown, |
||||
operation: "shutdown", |
||||
}, |
||||
"forceFlushEmitsCustomMetric": { |
||||
metricKey: internalMetricExporterForceFlush, |
||||
operation: "flush", |
||||
}, |
||||
} { |
||||
t.Run(name, func(t *testing.T) { |
||||
// Init global sink.
|
||||
serviceName := "test.transform" |
||||
cfg := metrics.DefaultConfig(serviceName) |
||||
cfg.EnableHostname = false |
||||
|
||||
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) |
||||
metrics.NewGlobal(cfg, sink) |
||||
|
||||
// Perform operation that emits metric.
|
||||
exp := NewOTELExporter(tc.client, &url.URL{}) |
||||
|
||||
ctx := context.Background() |
||||
switch tc.operation { |
||||
case "flush": |
||||
exp.ForceFlush(ctx) |
||||
case "shutdown": |
||||
exp.Shutdown(ctx) |
||||
default: |
||||
exp.Export(ctx, inputResourceMetrics) |
||||
} |
||||
|
||||
// Collect sink metrics.
|
||||
intervals := sink.Data() |
||||
require.Len(t, intervals, 1) |
||||
key := serviceName + "." + strings.Join(tc.metricKey, ".") |
||||
sv := intervals[0].Counters[key] |
||||
|
||||
// Verify count for transform failure metric.
|
||||
require.NotNil(t, sv) |
||||
require.NotNil(t, sv.AggregateSample) |
||||
require.Equal(t, 1, sv.AggregateSample.Count) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestForceFlush(t *testing.T) { |
||||
t.Parallel() |
||||
exp := &OTELExporter{} |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
cancel() |
||||
|
||||
err := exp.ForceFlush(ctx) |
||||
require.ErrorIs(t, err, context.Canceled) |
||||
} |
||||
|
||||
func TestShutdown(t *testing.T) { |
||||
t.Parallel() |
||||
exp := &OTELExporter{} |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
cancel() |
||||
|
||||
err := exp.Shutdown(ctx) |
||||
require.ErrorIs(t, err, context.Canceled) |
||||
} |
||||
|
||||
func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics { |
||||
return &metricdata.ResourceMetrics{ |
||||
Resource: resource.Empty(), |
||||
ScopeMetrics: m, |
||||
} |
||||
} |
@ -0,0 +1,245 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"fmt" |
||||
"net/url" |
||||
"regexp" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
gometrics "github.com/armon/go-metrics" |
||||
"github.com/hashicorp/go-hclog" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
otelmetric "go.opentelemetry.io/otel/metric" |
||||
otelsdk "go.opentelemetry.io/otel/sdk/metric" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/client" |
||||
) |
||||
|
||||
// DefaultExportInterval is a default time interval between export of aggregated metrics.
|
||||
const DefaultExportInterval = 10 * time.Second |
||||
|
||||
// OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink.
|
||||
type OTELSinkOpts struct { |
||||
Reader otelsdk.Reader |
||||
Ctx context.Context |
||||
Filters []string |
||||
Labels map[string]string |
||||
} |
||||
|
||||
// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification.
|
||||
// Metric data is exported in OpenTelemetry Protocol (OTLP) wire format.
|
||||
// This should be used as a Go Metrics backend, as it implements the MetricsSink interface.
|
||||
type OTELSink struct { |
||||
// spaceReplacer cleans the flattened key by removing any spaces.
|
||||
spaceReplacer *strings.Replacer |
||||
logger hclog.Logger |
||||
filters *regexp.Regexp |
||||
|
||||
// meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK.
|
||||
// It handles reading/export of aggregated metric data.
|
||||
// It enables creation and usage of an OTEL Meter.
|
||||
meterProvider *otelsdk.MeterProvider |
||||
|
||||
// meter is an OTEL Meter, which enables the creation of OTEL instruments.
|
||||
meter *otelmetric.Meter |
||||
|
||||
// Instrument stores contain an OTEL Instrument per metric name (<name, instrument>)
|
||||
// for each gauge, counter and histogram types.
|
||||
// An instrument allows us to record a measurement for a particular metric, and continuously aggregates metrics.
|
||||
// We lazy load the creation of these intruments until a metric is seen, and use them repeatedly to record measurements.
|
||||
gaugeInstruments map[string]otelmetric.Float64ObservableGauge |
||||
counterInstruments map[string]otelmetric.Float64Counter |
||||
histogramInstruments map[string]otelmetric.Float64Histogram |
||||
|
||||
// gaugeStore is required to hold last-seen values of gauges
|
||||
// This is a workaround, as OTEL currently does not have synchronous gauge instruments.
|
||||
// It only allows the registration of "callbacks", which obtain values when the callback is called.
|
||||
// We must hold gauge values until the callback is called, when the measurement is exported, and can be removed.
|
||||
gaugeStore *gaugeStore |
||||
|
||||
mutex sync.Mutex |
||||
} |
||||
|
||||
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
|
||||
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
|
||||
// metrics in OTLP format to an external url.
|
||||
func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader { |
||||
exporter := NewOTELExporter(client, url) |
||||
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) |
||||
} |
||||
|
||||
// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
|
||||
// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which
|
||||
// enable us to create OTEL Instruments to record measurements.
|
||||
func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { |
||||
if opts.Reader == nil { |
||||
return nil, fmt.Errorf("ferror: provide valid reader") |
||||
} |
||||
|
||||
if opts.Ctx == nil { |
||||
return nil, fmt.Errorf("ferror: provide valid context") |
||||
} |
||||
|
||||
logger := hclog.FromContext(opts.Ctx).Named("otel_sink") |
||||
|
||||
filterList, err := newFilterRegex(opts.Filters) |
||||
if err != nil { |
||||
logger.Error("Failed to initialize all filters", "error", err) |
||||
} |
||||
|
||||
attrs := make([]attribute.KeyValue, 0, len(opts.Labels)) |
||||
for k, v := range opts.Labels { |
||||
kv := attribute.KeyValue{ |
||||
Key: attribute.Key(k), |
||||
Value: attribute.StringValue(v), |
||||
} |
||||
attrs = append(attrs, kv) |
||||
} |
||||
// Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically.
|
||||
res := resource.NewWithAttributes("", attrs...) |
||||
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) |
||||
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") |
||||
|
||||
return &OTELSink{ |
||||
filters: filterList, |
||||
spaceReplacer: strings.NewReplacer(" ", "_"), |
||||
logger: logger, |
||||
meterProvider: meterProvider, |
||||
meter: &meter, |
||||
gaugeStore: NewGaugeStore(), |
||||
gaugeInstruments: make(map[string]otelmetric.Float64ObservableGauge, 0), |
||||
counterInstruments: make(map[string]otelmetric.Float64Counter, 0), |
||||
histogramInstruments: make(map[string]otelmetric.Float64Histogram, 0), |
||||
}, nil |
||||
} |
||||
|
||||
// SetGauge emits a Consul gauge metric.
|
||||
func (o *OTELSink) SetGauge(key []string, val float32) { |
||||
o.SetGaugeWithLabels(key, val, nil) |
||||
} |
||||
|
||||
// AddSample emits a Consul histogram metric.
|
||||
func (o *OTELSink) AddSample(key []string, val float32) { |
||||
o.AddSampleWithLabels(key, val, nil) |
||||
} |
||||
|
||||
// IncrCounter emits a Consul counter metric.
|
||||
func (o *OTELSink) IncrCounter(key []string, val float32) { |
||||
o.IncrCounterWithLabels(key, val, nil) |
||||
} |
||||
|
||||
// AddSampleWithLabels emits a Consul gauge metric that gets
|
||||
// registed by an OpenTelemetry Histogram instrument.
|
||||
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { |
||||
k := o.flattenKey(key) |
||||
|
||||
if !o.filters.MatchString(k) { |
||||
return |
||||
} |
||||
|
||||
// Set value in global Gauge store.
|
||||
o.gaugeStore.Set(k, float64(val), toAttributes(labels)) |
||||
|
||||
o.mutex.Lock() |
||||
defer o.mutex.Unlock() |
||||
|
||||
// If instrument does not exist, create it and register callback to emit last value in global Gauge store.
|
||||
if _, ok := o.gaugeInstruments[k]; !ok { |
||||
// The registration of a callback only needs to happen once, when the instrument is created.
|
||||
// The callback will be triggered every export cycle for that metric.
|
||||
// It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle.
|
||||
inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k))) |
||||
if err != nil { |
||||
o.logger.Error("Failed to create gauge instrument", "error", err) |
||||
return |
||||
} |
||||
o.gaugeInstruments[k] = inst |
||||
} |
||||
} |
||||
|
||||
// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
|
||||
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { |
||||
k := o.flattenKey(key) |
||||
|
||||
if !o.filters.MatchString(k) { |
||||
return |
||||
} |
||||
|
||||
o.mutex.Lock() |
||||
defer o.mutex.Unlock() |
||||
|
||||
inst, ok := o.histogramInstruments[k] |
||||
if !ok { |
||||
histogram, err := (*o.meter).Float64Histogram(k) |
||||
if err != nil { |
||||
o.logger.Error("Failed create histogram instrument", "error", err) |
||||
return |
||||
} |
||||
inst = histogram |
||||
o.histogramInstruments[k] = inst |
||||
} |
||||
|
||||
attrs := toAttributes(labels) |
||||
inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) |
||||
} |
||||
|
||||
// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
|
||||
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { |
||||
k := o.flattenKey(key) |
||||
|
||||
if !o.filters.MatchString(k) { |
||||
return |
||||
} |
||||
|
||||
o.mutex.Lock() |
||||
defer o.mutex.Unlock() |
||||
|
||||
inst, ok := o.counterInstruments[k] |
||||
if !ok { |
||||
counter, err := (*o.meter).Float64Counter(k) |
||||
if err != nil { |
||||
o.logger.Error("Failed to create counter instrument:", "error", err) |
||||
return |
||||
} |
||||
|
||||
inst = counter |
||||
o.counterInstruments[k] = inst |
||||
} |
||||
|
||||
attrs := toAttributes(labels) |
||||
inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...)) |
||||
} |
||||
|
||||
// EmitKey unsupported.
|
||||
func (o *OTELSink) EmitKey(key []string, val float32) {} |
||||
|
||||
// flattenKey key along with its labels.
|
||||
func (o *OTELSink) flattenKey(parts []string) string { |
||||
buf := &bytes.Buffer{} |
||||
joined := strings.Join(parts, ".") |
||||
|
||||
o.spaceReplacer.WriteString(buf, joined) |
||||
|
||||
return buf.String() |
||||
} |
||||
|
||||
// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue
|
||||
func toAttributes(labels []gometrics.Label) []attribute.KeyValue { |
||||
if len(labels) == 0 { |
||||
return nil |
||||
} |
||||
attrs := make([]attribute.KeyValue, len(labels)) |
||||
for i, label := range labels { |
||||
attrs[i] = attribute.KeyValue{ |
||||
Key: attribute.Key(label.Name), |
||||
Value: attribute.StringValue(label.Value), |
||||
} |
||||
} |
||||
|
||||
return attrs |
||||
} |
@ -0,0 +1,409 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sort" |
||||
"strings" |
||||
"sync" |
||||
"testing" |
||||
|
||||
gometrics "github.com/armon/go-metrics" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/sdk/metric" |
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
) |
||||
|
||||
var ( |
||||
expectedResource = resource.NewWithAttributes("", attribute.KeyValue{ |
||||
Key: attribute.Key("node_id"), |
||||
Value: attribute.StringValue("test"), |
||||
}) |
||||
|
||||
attrs = attribute.NewSet(attribute.KeyValue{ |
||||
Key: attribute.Key("metric.label"), |
||||
Value: attribute.StringValue("test"), |
||||
}) |
||||
|
||||
expectedSinkMetrics = map[string]metricdata.Metrics{ |
||||
"consul.raft.leader": { |
||||
Name: "consul.raft.leader", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Gauge[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Value: float64(float32(0)), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"consul.autopilot.healthy": { |
||||
Name: "consul.autopilot.healthy", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Gauge[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: attrs, |
||||
Value: float64(float32(1.23)), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"consul.raft.state.leader": { |
||||
Name: "consul.raft.state.leader", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Sum[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Value: float64(float32(23.23)), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"consul.raft.apply": { |
||||
Name: "consul.raft.apply", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Sum[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: attrs, |
||||
Value: float64(float32(1.44)), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"consul.raft.leader.lastContact": { |
||||
Name: "consul.raft.leader.lastContact", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Histogram[float64]{ |
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Count: 1, |
||||
Sum: float64(float32(45.32)), |
||||
Min: metricdata.NewExtrema(float64(float32(45.32))), |
||||
Max: metricdata.NewExtrema(float64(float32(45.32))), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"consul.raft.commitTime": { |
||||
Name: "consul.raft.commitTime", |
||||
Description: "", |
||||
Unit: "", |
||||
Data: metricdata.Histogram[float64]{ |
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{ |
||||
{ |
||||
Attributes: attrs, |
||||
Count: 1, |
||||
Sum: float64(float32(26.34)), |
||||
Min: metricdata.NewExtrema(float64(float32(26.34))), |
||||
Max: metricdata.NewExtrema(float64(float32(26.34))), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func TestNewOTELSink(t *testing.T) { |
||||
t.Parallel() |
||||
for name, test := range map[string]struct { |
||||
wantErr string |
||||
opts *OTELSinkOpts |
||||
}{ |
||||
"failsWithEmptyLogger": { |
||||
wantErr: "ferror: provide valid context", |
||||
opts: &OTELSinkOpts{ |
||||
Reader: metric.NewManualReader(), |
||||
}, |
||||
}, |
||||
"failsWithEmptyReader": { |
||||
wantErr: "ferror: provide valid reader", |
||||
opts: &OTELSinkOpts{ |
||||
Reader: nil, |
||||
Ctx: context.Background(), |
||||
}, |
||||
}, |
||||
"success": { |
||||
opts: &OTELSinkOpts{ |
||||
Ctx: context.Background(), |
||||
Reader: metric.NewManualReader(), |
||||
Labels: map[string]string{ |
||||
"server": "test", |
||||
}, |
||||
Filters: []string{"raft"}, |
||||
}, |
||||
}, |
||||
} { |
||||
test := test |
||||
t.Run(name, func(t *testing.T) { |
||||
t.Parallel() |
||||
sink, err := NewOTELSink(test.opts) |
||||
if test.wantErr != "" { |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), test.wantErr) |
||||
return |
||||
} |
||||
|
||||
require.NotNil(t, sink) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestOTELSink(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
// Manual reader outputs the aggregated metrics when reader.Collect is called.
|
||||
reader := metric.NewManualReader() |
||||
|
||||
ctx := context.Background() |
||||
opts := &OTELSinkOpts{ |
||||
Reader: reader, |
||||
Ctx: ctx, |
||||
Filters: []string{"raft", "autopilot"}, |
||||
Labels: map[string]string{ |
||||
"node_id": "test", |
||||
}, |
||||
} |
||||
|
||||
sink, err := NewOTELSink(opts) |
||||
require.NoError(t, err) |
||||
|
||||
labels := []gometrics.Label{ |
||||
{ |
||||
Name: "metric.label", |
||||
Value: "test", |
||||
}, |
||||
} |
||||
|
||||
sink.SetGauge([]string{"consul", "raft", "leader"}, float32(0)) |
||||
sink.SetGaugeWithLabels([]string{"consul", "autopilot", "healthy"}, float32(1.23), labels) |
||||
|
||||
sink.IncrCounter([]string{"consul", "raft", "state", "leader"}, float32(23.23)) |
||||
sink.IncrCounterWithLabels([]string{"consul", "raft", "apply"}, float32(1.44), labels) |
||||
|
||||
sink.AddSample([]string{"consul", "raft", "leader", "lastContact"}, float32(45.32)) |
||||
sink.AddSampleWithLabels([]string{"consul", "raft", "commitTime"}, float32(26.34), labels) |
||||
|
||||
var collected metricdata.ResourceMetrics |
||||
err = reader.Collect(ctx, &collected) |
||||
require.NoError(t, err) |
||||
|
||||
isSame(t, expectedSinkMetrics, collected) |
||||
} |
||||
|
||||
func TestOTELSink_Race(t *testing.T) { |
||||
reader := metric.NewManualReader() |
||||
ctx := context.Background() |
||||
opts := &OTELSinkOpts{ |
||||
Ctx: ctx, |
||||
Reader: reader, |
||||
Labels: map[string]string{ |
||||
"node_id": "test", |
||||
}, |
||||
Filters: []string{"test"}, |
||||
} |
||||
|
||||
sink, err := NewOTELSink(opts) |
||||
require.NoError(t, err) |
||||
|
||||
samples := 100 |
||||
expectedMetrics := generateSamples(samples) |
||||
wg := &sync.WaitGroup{} |
||||
errCh := make(chan error, samples) |
||||
for k, v := range expectedMetrics { |
||||
wg.Add(1) |
||||
go func(k string, v metricdata.Metrics) { |
||||
defer wg.Done() |
||||
performSinkOperation(sink, k, v, errCh) |
||||
}(k, v) |
||||
} |
||||
wg.Wait() |
||||
|
||||
require.Empty(t, errCh) |
||||
|
||||
var collected metricdata.ResourceMetrics |
||||
err = reader.Collect(ctx, &collected) |
||||
require.NoError(t, err) |
||||
|
||||
isSame(t, expectedMetrics, collected) |
||||
} |
||||
|
||||
// generateSamples generates n of each gauges, counter and histogram measurements to use for test purposes.
|
||||
func generateSamples(n int) map[string]metricdata.Metrics { |
||||
generated := make(map[string]metricdata.Metrics, 3*n) |
||||
|
||||
for i := 0; i < n; i++ { |
||||
v := 12.3 |
||||
k := fmt.Sprintf("consul.test.gauges.%d", i) |
||||
generated[k] = metricdata.Metrics{ |
||||
Name: k, |
||||
Data: metricdata.Gauge[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Value: float64(float32(v)), |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
for i := 0; i < n; i++ { |
||||
v := 22.23 |
||||
k := fmt.Sprintf("consul.test.sum.%d", i) |
||||
generated[k] = metricdata.Metrics{ |
||||
Name: k, |
||||
Data: metricdata.Sum[float64]{ |
||||
DataPoints: []metricdata.DataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Value: float64(float32(v)), |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
} |
||||
|
||||
for i := 0; i < n; i++ { |
||||
v := 13.24 |
||||
k := fmt.Sprintf("consul.test.hist.%d", i) |
||||
generated[k] = metricdata.Metrics{ |
||||
Name: k, |
||||
Data: metricdata.Histogram[float64]{ |
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{ |
||||
{ |
||||
Attributes: *attribute.EmptySet(), |
||||
Sum: float64(float32(v)), |
||||
Max: metricdata.NewExtrema(float64(float32(v))), |
||||
Min: metricdata.NewExtrema(float64(float32(v))), |
||||
Count: 1, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
return generated |
||||
} |
||||
|
||||
// performSinkOperation emits a measurement using the OTELSink and calls wg.Done() when completed.
|
||||
func performSinkOperation(sink *OTELSink, k string, v metricdata.Metrics, errCh chan error) { |
||||
key := strings.Split(k, ".") |
||||
data := v.Data |
||||
switch data.(type) { |
||||
case metricdata.Gauge[float64]: |
||||
gauge, ok := data.(metricdata.Gauge[float64]) |
||||
if !ok { |
||||
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) |
||||
} |
||||
sink.SetGauge(key, float32(gauge.DataPoints[0].Value)) |
||||
case metricdata.Sum[float64]: |
||||
sum, ok := data.(metricdata.Sum[float64]) |
||||
if !ok { |
||||
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) |
||||
} |
||||
sink.IncrCounter(key, float32(sum.DataPoints[0].Value)) |
||||
case metricdata.Histogram[float64]: |
||||
hist, ok := data.(metricdata.Histogram[float64]) |
||||
if !ok { |
||||
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key) |
||||
} |
||||
sink.AddSample(key, float32(hist.DataPoints[0].Sum)) |
||||
} |
||||
} |
||||
|
||||
func isSame(t *testing.T, expectedMap map[string]metricdata.Metrics, actual metricdata.ResourceMetrics) { |
||||
// Validate resource
|
||||
require.Equal(t, expectedResource, actual.Resource) |
||||
|
||||
// Validate Metrics
|
||||
require.NotEmpty(t, actual.ScopeMetrics) |
||||
actualMetrics := actual.ScopeMetrics[0].Metrics |
||||
require.Equal(t, len(expectedMap), len(actualMetrics)) |
||||
|
||||
for _, actual := range actualMetrics { |
||||
name := actual.Name |
||||
expected, ok := expectedMap[actual.Name] |
||||
require.True(t, ok, "metric key %s should be in expectedMetrics map", name) |
||||
isSameMetrics(t, expected, actual) |
||||
} |
||||
} |
||||
|
||||
// compareMetrics verifies if two metricdata.Metric objects are equal by ignoring the time component.
|
||||
// avoid duplicate datapoint values to ensure predictable order of sort.
|
||||
func isSameMetrics(t *testing.T, expected metricdata.Metrics, actual metricdata.Metrics) { |
||||
require.Equal(t, expected.Name, actual.Name, "different .Name field") |
||||
require.Equal(t, expected.Description, actual.Description, "different .Description field") |
||||
require.Equal(t, expected.Unit, actual.Unit, "different .Unit field") |
||||
|
||||
switch expectedData := expected.Data.(type) { |
||||
case metricdata.Gauge[float64]: |
||||
actualData, ok := actual.Data.(metricdata.Gauge[float64]) |
||||
require.True(t, ok, "different metric types: expected metricdata.Gauge[float64]") |
||||
|
||||
isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints) |
||||
case metricdata.Sum[float64]: |
||||
actualData, ok := actual.Data.(metricdata.Sum[float64]) |
||||
require.True(t, ok, "different metric types: expected metricdata.Sum[float64]") |
||||
|
||||
isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints) |
||||
case metricdata.Histogram[float64]: |
||||
actualData, ok := actual.Data.(metricdata.Histogram[float64]) |
||||
require.True(t, ok, "different metric types: expected metricdata.Histogram") |
||||
|
||||
isSameHistogramData(t, expectedData.DataPoints, actualData.DataPoints) |
||||
} |
||||
} |
||||
|
||||
func isSameDataPoint(t *testing.T, expected []metricdata.DataPoint[float64], actual []metricdata.DataPoint[float64]) { |
||||
require.Equal(t, len(expected), len(actual), "different datapoints length") |
||||
|
||||
// Sort for predictable data in order of lowest value.
|
||||
sort.Slice(expected, func(i, j int) bool { |
||||
return expected[i].Value < expected[j].Value |
||||
}) |
||||
sort.Slice(actual, func(i, j int) bool { |
||||
return expected[i].Value < expected[j].Value |
||||
}) |
||||
|
||||
// Only verify the value and attributes.
|
||||
for i, dp := range expected { |
||||
currActual := actual[i] |
||||
require.Equal(t, dp.Value, currActual.Value, "different datapoint value") |
||||
require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes") |
||||
} |
||||
} |
||||
|
||||
func isSameHistogramData(t *testing.T, expected []metricdata.HistogramDataPoint[float64], actual []metricdata.HistogramDataPoint[float64]) { |
||||
require.Equal(t, len(expected), len(actual), "different histogram datapoint length") |
||||
|
||||
// Sort for predictable data in order of lowest sum.
|
||||
sort.Slice(expected, func(i, j int) bool { |
||||
return expected[i].Sum < expected[j].Sum |
||||
}) |
||||
sort.Slice(actual, func(i, j int) bool { |
||||
return expected[i].Sum < expected[j].Sum |
||||
}) |
||||
|
||||
// Only verify the value and the attributes.
|
||||
for i, dp := range expected { |
||||
currActual := actual[i] |
||||
require.Equal(t, dp.Sum, currActual.Sum, "different histogram datapoint .Sum value") |
||||
require.Equal(t, dp.Max, currActual.Max, "different histogram datapoint .Max value") |
||||
require.Equal(t, dp.Min, currActual.Min, "different histogram datapoint .Min value") |
||||
require.Equal(t, dp.Count, currActual.Count, "different histogram datapoint .Count value") |
||||
require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes") |
||||
} |
||||
} |
@ -0,0 +1,186 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
|
||||
goMetrics "github.com/armon/go-metrics" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata" |
||||
cpb "go.opentelemetry.io/proto/otlp/common/v1" |
||||
mpb "go.opentelemetry.io/proto/otlp/metrics/v1" |
||||
rpb "go.opentelemetry.io/proto/otlp/resource/v1" |
||||
) |
||||
|
||||
var ( |
||||
aggregationErr = errors.New("unsupported aggregation") |
||||
temporalityErr = errors.New("unsupported temporality") |
||||
) |
||||
|
||||
// isEmpty verifies if the given OTLP protobuf metrics contains metric data.
|
||||
// isEmpty returns true if no ScopeMetrics exist or all metrics within ScopeMetrics are empty.
|
||||
func isEmpty(rm *mpb.ResourceMetrics) bool { |
||||
// No ScopeMetrics
|
||||
if len(rm.ScopeMetrics) == 0 { |
||||
return true |
||||
} |
||||
|
||||
// If any inner metrics contain data, return false.
|
||||
for _, v := range rm.ScopeMetrics { |
||||
if len(v.Metrics) != 0 { |
||||
return false |
||||
} |
||||
} |
||||
|
||||
// All inner metrics are empty.
|
||||
return true |
||||
} |
||||
|
||||
// TransformOTLP returns an OTLP ResourceMetrics generated from OTEL metrics. If rm
|
||||
// contains invalid ScopeMetrics, an error will be returned along with an OTLP
|
||||
// ResourceMetrics that contains partial OTLP ScopeMetrics.
|
||||
func transformOTLP(rm *metricdata.ResourceMetrics) *mpb.ResourceMetrics { |
||||
sms := scopeMetricsToPB(rm.ScopeMetrics) |
||||
return &mpb.ResourceMetrics{ |
||||
Resource: &rpb.Resource{ |
||||
Attributes: attributesToPB(rm.Resource.Iter()), |
||||
}, |
||||
ScopeMetrics: sms, |
||||
} |
||||
} |
||||
|
||||
// scopeMetrics returns a slice of OTLP ScopeMetrics.
|
||||
func scopeMetricsToPB(scopeMetrics []metricdata.ScopeMetrics) []*mpb.ScopeMetrics { |
||||
out := make([]*mpb.ScopeMetrics, 0, len(scopeMetrics)) |
||||
for _, sm := range scopeMetrics { |
||||
ms := metricsToPB(sm.Metrics) |
||||
out = append(out, &mpb.ScopeMetrics{ |
||||
Scope: &cpb.InstrumentationScope{ |
||||
Name: sm.Scope.Name, |
||||
Version: sm.Scope.Version, |
||||
}, |
||||
Metrics: ms, |
||||
}) |
||||
} |
||||
return out |
||||
} |
||||
|
||||
// metrics returns a slice of OTLP Metric generated from OTEL metrics sdk ones.
|
||||
func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric { |
||||
out := make([]*mpb.Metric, 0, len(metrics)) |
||||
for _, m := range metrics { |
||||
o, err := metricTypeToPB(m) |
||||
if err != nil { |
||||
goMetrics.IncrCounter(internalMetricTransformFailure, 1) |
||||
continue |
||||
} |
||||
out = append(out, o) |
||||
} |
||||
return out |
||||
} |
||||
|
||||
// metricType identifies the instrument type and converts it to OTLP format.
|
||||
// only float64 values are accepted since the go metrics sink only receives float64 values.
|
||||
func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) { |
||||
out := &mpb.Metric{ |
||||
Name: m.Name, |
||||
Description: m.Description, |
||||
Unit: m.Unit, |
||||
} |
||||
switch a := m.Data.(type) { |
||||
case metricdata.Gauge[float64]: |
||||
out.Data = &mpb.Metric_Gauge{ |
||||
Gauge: &mpb.Gauge{ |
||||
DataPoints: dataPointsToPB(a.DataPoints), |
||||
}, |
||||
} |
||||
case metricdata.Sum[float64]: |
||||
if a.Temporality != metricdata.CumulativeTemporality { |
||||
return out, fmt.Errorf("error: %w: %T", temporalityErr, a) |
||||
} |
||||
out.Data = &mpb.Metric_Sum{ |
||||
Sum: &mpb.Sum{ |
||||
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, |
||||
IsMonotonic: a.IsMonotonic, |
||||
DataPoints: dataPointsToPB(a.DataPoints), |
||||
}, |
||||
} |
||||
case metricdata.Histogram[float64]: |
||||
if a.Temporality != metricdata.CumulativeTemporality { |
||||
return out, fmt.Errorf("error: %w: %T", temporalityErr, a) |
||||
} |
||||
out.Data = &mpb.Metric_Histogram{ |
||||
Histogram: &mpb.Histogram{ |
||||
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, |
||||
DataPoints: histogramDataPointsToPB(a.DataPoints), |
||||
}, |
||||
} |
||||
default: |
||||
return out, fmt.Errorf("error: %w: %T", aggregationErr, a) |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
// DataPoints returns a slice of OTLP NumberDataPoint generated from OTEL metrics sdk ones.
|
||||
func dataPointsToPB(dataPoints []metricdata.DataPoint[float64]) []*mpb.NumberDataPoint { |
||||
out := make([]*mpb.NumberDataPoint, 0, len(dataPoints)) |
||||
for _, dp := range dataPoints { |
||||
ndp := &mpb.NumberDataPoint{ |
||||
Attributes: attributesToPB(dp.Attributes.Iter()), |
||||
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()), |
||||
TimeUnixNano: uint64(dp.Time.UnixNano()), |
||||
} |
||||
|
||||
ndp.Value = &mpb.NumberDataPoint_AsDouble{ |
||||
AsDouble: dp.Value, |
||||
} |
||||
out = append(out, ndp) |
||||
} |
||||
return out |
||||
} |
||||
|
||||
// HistogramDataPoints returns a slice of OTLP HistogramDataPoint from OTEL metrics sdk ones.
|
||||
func histogramDataPointsToPB(dataPoints []metricdata.HistogramDataPoint[float64]) []*mpb.HistogramDataPoint { |
||||
out := make([]*mpb.HistogramDataPoint, 0, len(dataPoints)) |
||||
for _, dp := range dataPoints { |
||||
sum := dp.Sum |
||||
hdp := &mpb.HistogramDataPoint{ |
||||
Attributes: attributesToPB(dp.Attributes.Iter()), |
||||
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()), |
||||
TimeUnixNano: uint64(dp.Time.UnixNano()), |
||||
Count: dp.Count, |
||||
Sum: &sum, |
||||
BucketCounts: dp.BucketCounts, |
||||
ExplicitBounds: dp.Bounds, |
||||
} |
||||
if v, ok := dp.Min.Value(); ok { |
||||
hdp.Min = &v |
||||
} |
||||
if v, ok := dp.Max.Value(); ok { |
||||
hdp.Max = &v |
||||
} |
||||
out = append(out, hdp) |
||||
} |
||||
return out |
||||
} |
||||
|
||||
// attributes transforms items of an attribute iterator into OTLP key-values.
|
||||
// Currently, labels are only <string, string> key-value pairs.
|
||||
func attributesToPB(iter attribute.Iterator) []*cpb.KeyValue { |
||||
l := iter.Len() |
||||
if iter.Len() == 0 { |
||||
return nil |
||||
} |
||||
|
||||
out := make([]*cpb.KeyValue, 0, l) |
||||
for iter.Next() { |
||||
kv := iter.Attribute() |
||||
av := &cpb.AnyValue{ |
||||
Value: &cpb.AnyValue_StringValue{ |
||||
StringValue: kv.Value.AsString(), |
||||
}, |
||||
} |
||||
out = append(out, &cpb.KeyValue{Key: string(kv.Key), Value: av}) |
||||
} |
||||
return out |
||||
} |
@ -0,0 +1,342 @@
|
||||
package telemetry |
||||
|
||||
import ( |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/armon/go-metrics" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/sdk/instrumentation" |
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0" |
||||
cpb "go.opentelemetry.io/proto/otlp/common/v1" |
||||
mpb "go.opentelemetry.io/proto/otlp/metrics/v1" |
||||
rpb "go.opentelemetry.io/proto/otlp/resource/v1" |
||||
) |
||||
|
||||
var ( |
||||
// Common attributes for test cases.
|
||||
start = time.Date(2000, time.January, 01, 0, 0, 0, 0, time.FixedZone("GMT", 0)) |
||||
end = start.Add(30 * time.Second) |
||||
|
||||
alice = attribute.NewSet(attribute.String("user", "alice")) |
||||
bob = attribute.NewSet(attribute.String("user", "bob")) |
||||
|
||||
pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ |
||||
Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, |
||||
}} |
||||
pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ |
||||
Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, |
||||
}} |
||||
|
||||
// DataPoint test case : Histogram Datapoints (Histogram)
|
||||
minA, maxA, sumA = 2.0, 4.0, 90.0 |
||||
minB, maxB, sumB = 4.0, 150.0, 234.0 |
||||
inputHDP = []metricdata.HistogramDataPoint[float64]{{ |
||||
Attributes: alice, |
||||
StartTime: start, |
||||
Time: end, |
||||
Count: 30, |
||||
Bounds: []float64{1, 5}, |
||||
BucketCounts: []uint64{0, 30, 0}, |
||||
Min: metricdata.NewExtrema(minA), |
||||
Max: metricdata.NewExtrema(maxA), |
||||
Sum: sumA, |
||||
}, { |
||||
Attributes: bob, |
||||
StartTime: start, |
||||
Time: end, |
||||
Count: 3, |
||||
Bounds: []float64{1, 5}, |
||||
BucketCounts: []uint64{0, 1, 2}, |
||||
Min: metricdata.NewExtrema(minB), |
||||
Max: metricdata.NewExtrema(maxB), |
||||
Sum: sumB, |
||||
}} |
||||
|
||||
expectedHDP = []*mpb.HistogramDataPoint{{ |
||||
Attributes: []*cpb.KeyValue{pbAlice}, |
||||
StartTimeUnixNano: uint64(start.UnixNano()), |
||||
TimeUnixNano: uint64(end.UnixNano()), |
||||
Count: 30, |
||||
Sum: &sumA, |
||||
ExplicitBounds: []float64{1, 5}, |
||||
BucketCounts: []uint64{0, 30, 0}, |
||||
Min: &minA, |
||||
Max: &maxA, |
||||
}, { |
||||
Attributes: []*cpb.KeyValue{pbBob}, |
||||
StartTimeUnixNano: uint64(start.UnixNano()), |
||||
TimeUnixNano: uint64(end.UnixNano()), |
||||
Count: 3, |
||||
Sum: &sumB, |
||||
ExplicitBounds: []float64{1, 5}, |
||||
BucketCounts: []uint64{0, 1, 2}, |
||||
Min: &minB, |
||||
Max: &maxB, |
||||
}} |
||||
// DataPoint test case : Number Datapoints (Gauge / Counter)
|
||||
inputDP = []metricdata.DataPoint[float64]{ |
||||
{Attributes: alice, StartTime: start, Time: end, Value: 1.0}, |
||||
{Attributes: bob, StartTime: start, Time: end, Value: 2.0}, |
||||
} |
||||
|
||||
expectedDP = []*mpb.NumberDataPoint{ |
||||
{ |
||||
Attributes: []*cpb.KeyValue{pbAlice}, |
||||
StartTimeUnixNano: uint64(start.UnixNano()), |
||||
TimeUnixNano: uint64(end.UnixNano()), |
||||
Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 1.0}, |
||||
}, |
||||
{ |
||||
Attributes: []*cpb.KeyValue{pbBob}, |
||||
StartTimeUnixNano: uint64(start.UnixNano()), |
||||
TimeUnixNano: uint64(end.UnixNano()), |
||||
Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 2.0}, |
||||
}, |
||||
} |
||||
|
||||
invalidSumTemporality = metricdata.Metrics{ |
||||
Name: "invalid-sum", |
||||
Description: "Sum with invalid temporality", |
||||
Unit: "1", |
||||
Data: metricdata.Sum[float64]{ |
||||
Temporality: metricdata.DeltaTemporality, |
||||
IsMonotonic: false, |
||||
DataPoints: inputDP, |
||||
}, |
||||
} |
||||
|
||||
invalidSumAgg = metricdata.Metrics{ |
||||
Name: "unknown", |
||||
Description: "Unknown aggregation", |
||||
Unit: "1", |
||||
Data: metricdata.Sum[int64]{}, |
||||
} |
||||
|
||||
invalidHistTemporality = metricdata.Metrics{ |
||||
Name: "invalid-histogram", |
||||
Description: "Invalid histogram", |
||||
Unit: "1", |
||||
Data: metricdata.Histogram[float64]{ |
||||
Temporality: metricdata.DeltaTemporality, |
||||
DataPoints: inputHDP, |
||||
}, |
||||
} |
||||
|
||||
validFloat64Gauge = metricdata.Metrics{ |
||||
Name: "float64-gauge", |
||||
Description: "Gauge with float64 values", |
||||
Unit: "1", |
||||
Data: metricdata.Gauge[float64]{DataPoints: inputDP}, |
||||
} |
||||
|
||||
validFloat64Sum = metricdata.Metrics{ |
||||
Name: "float64-sum", |
||||
Description: "Sum with float64 values", |
||||
Unit: "1", |
||||
Data: metricdata.Sum[float64]{ |
||||
Temporality: metricdata.CumulativeTemporality, |
||||
IsMonotonic: false, |
||||
DataPoints: inputDP, |
||||
}, |
||||
} |
||||
|
||||
validFloat64Hist = metricdata.Metrics{ |
||||
Name: "float64-histogram", |
||||
Description: "Histogram", |
||||
Unit: "1", |
||||
Data: metricdata.Histogram[float64]{ |
||||
Temporality: metricdata.CumulativeTemporality, |
||||
DataPoints: inputHDP, |
||||
}, |
||||
} |
||||
|
||||
// Metrics Test Case
|
||||
// - 3 invalid metrics and 3 Valid to test filtering
|
||||
// - 1 invalid metric type
|
||||
// - 2 invalid cummulative temporalities (only cummulative supported)
|
||||
// - 3 types (Gauge, Counter, and Histogram) supported
|
||||
inputMetrics = []metricdata.Metrics{ |
||||
validFloat64Gauge, |
||||
validFloat64Sum, |
||||
validFloat64Hist, |
||||
invalidSumTemporality, |
||||
invalidHistTemporality, |
||||
invalidSumAgg, |
||||
} |
||||
|
||||
expectedMetrics = []*mpb.Metric{ |
||||
{ |
||||
Name: "float64-gauge", |
||||
Description: "Gauge with float64 values", |
||||
Unit: "1", |
||||
Data: &mpb.Metric_Gauge{Gauge: &mpb.Gauge{DataPoints: expectedDP}}, |
||||
}, |
||||
{ |
||||
Name: "float64-sum", |
||||
Description: "Sum with float64 values", |
||||
Unit: "1", |
||||
Data: &mpb.Metric_Sum{Sum: &mpb.Sum{ |
||||
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, |
||||
IsMonotonic: false, |
||||
DataPoints: expectedDP, |
||||
}}, |
||||
}, |
||||
{ |
||||
Name: "float64-histogram", |
||||
Description: "Histogram", |
||||
Unit: "1", |
||||
Data: &mpb.Metric_Histogram{Histogram: &mpb.Histogram{ |
||||
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, |
||||
DataPoints: expectedHDP, |
||||
}}, |
||||
}, |
||||
} |
||||
|
||||
// ScopeMetrics Test Cases
|
||||
inputScopeMetrics = []metricdata.ScopeMetrics{{ |
||||
Scope: instrumentation.Scope{ |
||||
Name: "test/code/path", |
||||
Version: "v0.1.0", |
||||
}, |
||||
Metrics: inputMetrics, |
||||
}} |
||||
|
||||
expectedScopeMetrics = []*mpb.ScopeMetrics{{ |
||||
Scope: &cpb.InstrumentationScope{ |
||||
Name: "test/code/path", |
||||
Version: "v0.1.0", |
||||
}, |
||||
Metrics: expectedMetrics, |
||||
}} |
||||
|
||||
// ResourceMetrics Test Cases
|
||||
inputResourceMetrics = &metricdata.ResourceMetrics{ |
||||
Resource: resource.NewSchemaless( |
||||
semconv.ServiceName("test server"), |
||||
semconv.ServiceVersion("v0.1.0"), |
||||
), |
||||
ScopeMetrics: inputScopeMetrics, |
||||
} |
||||
|
||||
expectedResourceMetrics = &mpb.ResourceMetrics{ |
||||
Resource: &rpb.Resource{ |
||||
Attributes: []*cpb.KeyValue{ |
||||
{ |
||||
Key: "service.name", |
||||
Value: &cpb.AnyValue{ |
||||
Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, |
||||
}, |
||||
}, |
||||
{ |
||||
Key: "service.version", |
||||
Value: &cpb.AnyValue{ |
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
ScopeMetrics: expectedScopeMetrics, |
||||
} |
||||
) |
||||
|
||||
// TestTransformOTLP runs tests from the "bottom-up" of the metricdata data types.
|
||||
func TestTransformOTLP(t *testing.T) { |
||||
t.Parallel() |
||||
// Histogram DataPoint Test Case (Histograms)
|
||||
assert.Equal(t, expectedHDP, histogramDataPointsToPB(inputHDP)) |
||||
|
||||
// Number DataPoint Test Case (Counters / Gauges)
|
||||
require.Equal(t, expectedDP, dataPointsToPB(inputDP)) |
||||
|
||||
// MetricType Error Test Cases
|
||||
_, err := metricTypeToPB(invalidHistTemporality) |
||||
require.Error(t, err) |
||||
require.ErrorIs(t, err, temporalityErr) |
||||
|
||||
_, err = metricTypeToPB(invalidSumTemporality) |
||||
require.Error(t, err) |
||||
require.ErrorIs(t, err, temporalityErr) |
||||
|
||||
_, err = metricTypeToPB(invalidSumAgg) |
||||
require.Error(t, err) |
||||
require.ErrorIs(t, err, aggregationErr) |
||||
|
||||
// Metrics Test Case
|
||||
m := metricsToPB(inputMetrics) |
||||
require.Equal(t, expectedMetrics, m) |
||||
require.Equal(t, len(expectedMetrics), 3) |
||||
|
||||
// Scope Metrics Test Case
|
||||
sm := scopeMetricsToPB(inputScopeMetrics) |
||||
require.Equal(t, expectedScopeMetrics, sm) |
||||
|
||||
// // Resource Metrics Test Case
|
||||
rm := transformOTLP(inputResourceMetrics) |
||||
require.Equal(t, expectedResourceMetrics, rm) |
||||
} |
||||
|
||||
// TestTransformOTLP_CustomMetrics tests that a custom metric (hcp.otel.transform.failure) is emitted
|
||||
// when transform fails. This test cannot be run in parallel as the metrics.NewGlobal()
|
||||
// sets a shared global sink.
|
||||
func TestTransformOTLP_CustomMetrics(t *testing.T) { |
||||
for name, tc := range map[string]struct { |
||||
inputRM *metricdata.ResourceMetrics |
||||
expectedMetricCount int |
||||
}{ |
||||
"successNoMetric": { |
||||
inputRM: &metricdata.ResourceMetrics{ |
||||
// 3 valid metrics.
|
||||
ScopeMetrics: []metricdata.ScopeMetrics{ |
||||
{ |
||||
Metrics: []metricdata.Metrics{ |
||||
validFloat64Gauge, |
||||
validFloat64Hist, |
||||
validFloat64Sum, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"failureEmitsMetric": { |
||||
// inputScopeMetrics contains 3 bad metrics.
|
||||
inputRM: inputResourceMetrics, |
||||
expectedMetricCount: 3, |
||||
}, |
||||
} { |
||||
tc := tc |
||||
t.Run(name, func(t *testing.T) { |
||||
// Init global sink.
|
||||
serviceName := "test.transform" |
||||
cfg := metrics.DefaultConfig(serviceName) |
||||
cfg.EnableHostname = false |
||||
|
||||
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) |
||||
metrics.NewGlobal(cfg, sink) |
||||
|
||||
// Perform operation that emits metric.
|
||||
transformOTLP(tc.inputRM) |
||||
|
||||
// Collect sink metrics.
|
||||
intervals := sink.Data() |
||||
require.Len(t, intervals, 1) |
||||
key := serviceName + "." + strings.Join(internalMetricTransformFailure, ".") |
||||
sv := intervals[0].Counters[key] |
||||
|
||||
if tc.expectedMetricCount == 0 { |
||||
require.Empty(t, sv) |
||||
return |
||||
} |
||||
|
||||
// Verify count for transform failure metric.
|
||||
require.NotNil(t, sv) |
||||
require.NotNil(t, sv.AggregateSample) |
||||
require.Equal(t, 3, sv.AggregateSample.Count) |
||||
}) |
||||
} |
||||
} |
Loading…
Reference in new issue