Browse Source

Manual Backport of [HCP Telemetry] Move first TelemetryConfig Fetch into the TelemetryConfigProvider into release/1.15.x (#18627)

* [HCP Telemetry] Move first TelemetryConfig Fetch into the TelemetryConfigProvider (#18318)

* Add Enabler interface to turn sink on/off

* Use h for hcpProviderImpl vars, fix PR feeback and fix errors

* Keep nil check in exporter and fix tests

* Clarify comment and fix function name

* Use disable instead of enable

* Fix errors nit in otlp_transform

* Add test for refreshInterval of updateConfig

* Add disabled field in MetricsConfig struct

* Fix PR feedback: improve comment and remove double colons

* Fix deps test which requires a maybe

* Update hcp-sdk-go to v0.61.0

* use disabled flag in telemetry_config.go

* Handle 4XX errors in telemetry_provider

* Fix deps test

* Check 4XX instead

* Run make go-mod-tidy

* Delete test-integ
pull/18632/head
Ashvitha 1 year ago committed by GitHub
parent
commit
4f4a95d458
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      agent/hcp/client/telemetry_config.go
  2. 47
      agent/hcp/client/telemetry_config_test.go
  3. 39
      agent/hcp/deps.go
  4. 84
      agent/hcp/deps_test.go
  5. 6
      agent/hcp/telemetry/otel_exporter.go
  6. 17
      agent/hcp/telemetry/otel_exporter_test.go
  7. 22
      agent/hcp/telemetry/otel_sink.go
  8. 32
      agent/hcp/telemetry/otel_sink_test.go
  9. 10
      agent/hcp/telemetry/otlp_transform.go
  10. 6
      agent/hcp/telemetry/otlp_transform_test.go
  11. 112
      agent/hcp/telemetry_provider.go
  12. 208
      agent/hcp/telemetry_provider_test.go
  13. 3
      go.mod
  14. 6
      go.sum

19
agent/hcp/client/telemetry_config.go

@ -17,7 +17,7 @@ import (
var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
DefaultMetricFilters = regexp.MustCompile(".+")
// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
@ -26,6 +26,7 @@ var (
errMissingMetricsConfig = errors.New("missing metrics config")
errInvalidRefreshInterval = errors.New("invalid refresh interval")
errInvalidEndpoint = errors.New("invalid metrics endpoint")
errEmptyEndpoint = errors.New("empty metrics endpoint")
)
// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
@ -40,6 +41,7 @@ type MetricsConfig struct {
Labels map[string]string
Filters *regexp.Regexp
Endpoint *url.URL
Disabled bool
}
// RefreshConfig contains configuration for the periodic fetch of configuration from HCP.
@ -47,11 +49,6 @@ type RefreshConfig struct {
RefreshInterval time.Duration
}
// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists.
func (t *TelemetryConfig) MetricsEnabled() bool {
return t.MetricsConfig.Endpoint != nil
}
// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid.
func validateAgentTelemetryConfigPayload(resp *hcptelemetry.AgentTelemetryConfigOK) error {
if resp.Payload == nil {
@ -83,7 +80,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
telemetryConfig := resp.Payload.TelemetryConfig
metricsEndpoint, err := convertMetricEndpoint(telemetryConfig.Endpoint, telemetryConfig.Metrics.Endpoint)
if err != nil {
return nil, errInvalidEndpoint
return nil, err
}
metricsFilters := convertMetricFilters(ctx, telemetryConfig.Metrics.IncludeList)
@ -94,6 +91,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
Endpoint: metricsEndpoint,
Labels: metricLabels,
Filters: metricsFilters,
Disabled: telemetryConfig.Metrics.Disabled,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: refreshInterval,
@ -111,9 +109,8 @@ func convertMetricEndpoint(telemetryEndpoint string, metricsEndpoint string) (*u
endpoint = metricsEndpoint
}
// If endpoint is empty, server not registered with CCM, no error returned.
if endpoint == "" {
return nil, nil
return nil, errEmptyEndpoint
}
// Endpoint from CTW has no metrics path, so it must be added.
@ -142,7 +139,7 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.
if len(validFilters) == 0 {
logger.Error("no valid filters")
return defaultMetricFilters
return DefaultMetricFilters
}
// Combine the valid regex strings with OR.
@ -150,7 +147,7 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile final regex", "error", err)
return defaultMetricFilters
return DefaultMetricFilters
}
return composedRegex

47
agent/hcp/client/telemetry_config_test.go

@ -85,7 +85,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr error
expectedEnabled bool
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
@ -112,34 +111,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"successNoEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"test", "consul"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: nil,
Labels: map[string]string{"test": "test"},
Filters: validTestFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: false,
},
"successBadFilters": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
@ -160,13 +131,12 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
Filters: DefaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"errorsWithInvalidRefreshInterval": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
@ -206,7 +176,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
}
require.NoError(t, err)
require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg)
require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled())
})
}
}
@ -228,10 +197,10 @@ func TestConvertMetricEndpoint(t *testing.T) {
override: "https://override.com",
expected: "https://override.com/v1/metrics",
},
"noErrorWithEmptyEndpoints": {
"errorWithEmptyEndpoints": {
endpoint: "",
override: "",
expected: "",
wantErr: errEmptyEndpoint,
},
"errorWithInvalidURL": {
endpoint: " ",
@ -249,12 +218,6 @@ func TestConvertMetricEndpoint(t *testing.T) {
return
}
if tc.expected == "" {
require.Nil(t, u)
require.NoError(t, err)
return
}
require.NotNil(t, u)
require.NoError(t, err)
require.Equal(t, tc.expected, u.String())
@ -274,13 +237,13 @@ func TestConvertMetricFilters(t *testing.T) {
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},

39
agent/hcp/deps.go

@ -3,19 +3,19 @@ package hcp
import (
"context"
"fmt"
"time"
"github.com/armon/go-metrics"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/go-hclog"
)
// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
type Deps struct {
Client hcpclient.Client
Client client.Client
Provider scada.Provider
Sink metrics.MetricSink
}
@ -24,7 +24,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)
client, err := hcpclient.NewClient(cfg)
hcpClient, err := client.NewClient(cfg)
if err != nil {
return Deps{}, fmt.Errorf("failed to init client: %w", err)
}
@ -34,50 +34,33 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}
metricsClient, err := hcpclient.NewMetricsClient(ctx, &cfg)
metricsClient, err := client.NewMetricsClient(ctx, &cfg)
if err != nil {
logger.Error("failed to init metrics client", "error", err)
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
}
sink, err := sink(ctx, client, metricsClient)
sink, err := sink(ctx, metricsClient, NewHCPProvider(ctx, hcpClient))
if err != nil {
// Do not prevent server start if sink init fails, only log error.
logger.Error("failed to init sink", "error", err)
}
return Deps{
Client: client,
Client: hcpClient,
Provider: provider,
Sink: sink,
}, nil
}
// sink initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, errors are returned, only to be logged.
func sink(
ctx context.Context,
hcpClient hcpclient.Client,
metricsClient telemetry.MetricsClient,
cfgProvider *hcpProviderImpl,
) (metrics.MetricSink, error) {
logger := hclog.FromContext(ctx).Named("sink")
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
return nil, fmt.Errorf("failed to fetch telemetry config: %w", err)
}
if !telemetryCfg.MetricsEnabled() {
return nil, nil
}
cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg)
if err != nil {
return nil, fmt.Errorf("failed to init config provider: %w", err)
}
logger := hclog.FromContext(ctx)
reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{
@ -87,7 +70,7 @@ func sink(
sink, err := telemetry.NewOTELSink(ctx, sinkOpts)
if err != nil {
return nil, fmt.Errorf("failed create OTELSink: %w", err)
return nil, fmt.Errorf("failed to create OTELSink: %w", err)
}
logger.Debug("initialized HCP metrics sink")

84
agent/hcp/deps_test.go

@ -2,16 +2,10 @@ package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
)
@ -21,79 +15,11 @@ type mockMetricsClient struct {
func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")
mt := mockTelemetryConfig(1*time.Second, u, filters)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
expectedSink: true,
},
"noSinkWhenFetchTelemetryConfigFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
wantErr: "failed to fetch telemetry config",
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mt := mockTelemetryConfig(1*time.Second, nil, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
},
"noSinkWhenTelemetryConfigProviderInitFails": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
// Bad refresh interval forces ConfigProvider creation failure.
mt := mockTelemetryConfig(0*time.Second, u, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
wantErr: "failed to init config provider",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
mc := mockMetricsClient{}
test.expect(c)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, err := sink(ctx, mockMetricsClient{}, &hcpProviderImpl{})
s, err := sink(ctx, c, mc)
if test.wantErr != "" {
require.NotNil(t, err)
require.Contains(t, err.Error(), test.wantErr)
require.Nil(t, s)
return
}
if !test.expectedSink {
require.Nil(t, s)
require.Nil(t, err)
return
}
require.NotNil(t, s)
})
}
}
func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig {
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: metricsEndpoint,
Filters: filters,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: refreshInterval,
},
}
require.NotNil(t, s)
require.NoError(t, err)
}

6
agent/hcp/telemetry/otel_exporter.go

@ -20,7 +20,9 @@ type MetricsClient interface {
// EndpointProvider provides the endpoint where metrics are exported to by the OTELExporter.
// EndpointProvider exposes the GetEndpoint() interface method to fetch the endpoint.
// This abstraction layer offers flexibility, in particular for dynamic configuration or changes to the endpoint.
// The OTELExporter calls the Disabled interface to verify that it should actually export metrics.
type EndpointProvider interface {
Disabled
GetEndpoint() *url.URL
}
@ -65,6 +67,10 @@ func (e *otelExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre
// Export serializes and transmits metric data to a receiver.
func (e *otelExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
if e.endpointProvider.IsDisabled() {
return nil
}
endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil {
return nil

17
agent/hcp/telemetry/otel_exporter_test.go

@ -31,9 +31,11 @@ func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *met
type mockEndpointProvider struct {
endpoint *url.URL
disabled bool
}
func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func (m *mockEndpointProvider) IsDisabled() bool { return m.disabled }
func TestTemporality(t *testing.T) {
t.Parallel()
@ -77,13 +79,20 @@ func TestExport(t *testing.T) {
client MetricsClient
provider EndpointProvider
}{
"earlyReturnDisabledProvider": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{
disabled: true,
},
},
"earlyReturnWithoutEndpoint": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
@ -91,6 +100,7 @@ func TestExport(t *testing.T) {
{Metrics: []metricdata.Metrics{}},
},
),
provider: &mockEndpointProvider{},
},
"errorWithExportFailure": {
client: &mockMetricsClient{
@ -107,6 +117,9 @@ func TestExport(t *testing.T) {
},
},
),
provider: &mockEndpointProvider{
endpoint: &url.URL{},
},
wantErr: "failed to export metrics",
},
} {

22
agent/hcp/telemetry/otel_sink.go

@ -33,8 +33,15 @@ const (
defaultExportTimeout = 30 * time.Second
)
// Disabled should be implemented to turn on/off metrics processing
type Disabled interface {
// IsDisabled() can return true disallow the sink from accepting metrics.
IsDisabled() bool
}
// ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface {
Disabled
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string
@ -144,8 +151,11 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
// AddSampleWithLabels emits a Consul gauge metric that gets
// registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if o.cfgProvider.IsDisabled() {
return
}
k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}
@ -172,8 +182,11 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr
// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if o.cfgProvider.IsDisabled() {
return
}
k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}
@ -198,8 +211,11 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet
// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if o.cfgProvider.IsDisabled() {
return
}
k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}

32
agent/hcp/telemetry/otel_sink_test.go

@ -18,8 +18,9 @@ import (
)
type mockConfigProvider struct {
filter *regexp.Regexp
labels map[string]string
filter *regexp.Regexp
labels map[string]string
disabled bool
}
func (m *mockConfigProvider) GetLabels() map[string]string {
@ -30,6 +31,10 @@ func (m *mockConfigProvider) GetFilters() *regexp.Regexp {
return m.filter
}
func (m *mockConfigProvider) IsDisabled() bool {
return m.disabled
}
var (
expectedResource = resource.NewSchemaless()
@ -220,6 +225,29 @@ func TestOTELSink(t *testing.T) {
isSame(t, expectedSinkMetrics, collected)
}
func TestOTELSinkDisabled(t *testing.T) {
reader := metric.NewManualReader()
ctx := context.Background()
sink, err := NewOTELSink(ctx, &OTELSinkOpts{
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("raft"),
disabled: true,
},
Reader: reader,
})
require.NoError(t, err)
sink.SetGauge([]string{"consul", "raft", "gauge"}, 1)
sink.IncrCounter([]string{"consul", "raft", "counter"}, 1)
sink.AddSample([]string{"consul", "raft", "sample"}, 1)
var collected metricdata.ResourceMetrics
err = reader.Collect(ctx, &collected)
require.NoError(t, err)
require.Empty(t, collected.ScopeMetrics)
}
func TestLabelsToAttributes(t *testing.T) {
for name, test := range map[string]struct {
providerLabels map[string]string

10
agent/hcp/telemetry/otlp_transform.go

@ -13,8 +13,8 @@ import (
)
var (
aggregationErr = errors.New("unsupported aggregation")
temporalityErr = errors.New("unsupported temporality")
errAggregaton = errors.New("unsupported aggregation")
errTemporality = errors.New("unsupported temporality")
)
// isEmpty verifies if the given OTLP protobuf metrics contains metric data.
@ -96,7 +96,7 @@ func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) {
}
case metricdata.Sum[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("error: %w: %T", temporalityErr, a)
return out, fmt.Errorf("failed to convert metric to otel format: %w: %T", errTemporality, a)
}
out.Data = &mpb.Metric_Sum{
Sum: &mpb.Sum{
@ -107,7 +107,7 @@ func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) {
}
case metricdata.Histogram[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("error: %w: %T", temporalityErr, a)
return out, fmt.Errorf("failed to convert metric to otel format: %w: %T", errTemporality, a)
}
out.Data = &mpb.Metric_Histogram{
Histogram: &mpb.Histogram{
@ -116,7 +116,7 @@ func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) {
},
}
default:
return out, fmt.Errorf("error: %w: %T", aggregationErr, a)
return out, fmt.Errorf("failed to convert metric to otel format: %w: %T", errAggregaton, a)
}
return out, nil
}

6
agent/hcp/telemetry/otlp_transform_test.go

@ -257,15 +257,15 @@ func TestTransformOTLP(t *testing.T) {
// MetricType Error Test Cases
_, err := metricTypeToPB(invalidHistTemporality)
require.Error(t, err)
require.ErrorIs(t, err, temporalityErr)
require.ErrorIs(t, err, errTemporality)
_, err = metricTypeToPB(invalidSumTemporality)
require.Error(t, err)
require.ErrorIs(t, err, temporalityErr)
require.ErrorIs(t, err, errTemporality)
_, err = metricTypeToPB(invalidSumAgg)
require.Error(t, err)
require.ErrorIs(t, err, aggregationErr)
require.ErrorIs(t, err, errAggregaton)
// Metrics Test Case
m := metricsToPB(inputMetrics)

112
agent/hcp/telemetry_provider.go

@ -2,13 +2,13 @@ package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/go-openapi/runtime"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/hcp/client"
@ -20,6 +20,8 @@ var (
internalMetricRefreshFailure []string = []string{"hcp", "telemetry_config_provider", "refresh", "failure"}
// internalMetricRefreshSuccess is a metric to monitor refresh successes.
internalMetricRefreshSuccess []string = []string{"hcp", "telemetry_config_provider", "refresh", "success"}
// defaultTelemetryConfigRefreshInterval is a default fallback in case the first HCP fetch fails.
defaultTelemetryConfigRefreshInterval = 1 * time.Minute
)
// Ensure hcpProviderImpl implements telemetry provider interfaces.
@ -43,47 +45,50 @@ type hcpProviderImpl struct {
// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
// fields MUST be exported to compute hash for equals method.
type dynamicConfig struct {
Endpoint *url.URL
Labels map[string]string
Filters *regexp.Regexp
disabled bool
endpoint *url.URL
labels map[string]string
filters *regexp.Regexp
// refreshInterval controls the interval at which configuration is fetched from HCP to refresh config.
RefreshInterval time.Duration
refreshInterval time.Duration
}
// NewHCPProvider initializes and starts a HCP Telemetry provider with provided params.
func NewHCPProvider(ctx context.Context, hcpClient client.Client, telemetryCfg *client.TelemetryConfig) (*hcpProviderImpl, error) {
refreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
// refreshInterval must be greater than 0, otherwise time.Ticker panics.
if refreshInterval <= 0 {
return nil, fmt.Errorf("invalid refresh interval: %d", refreshInterval)
}
cfg := &dynamicConfig{
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
Filters: telemetryCfg.MetricsConfig.Filters,
RefreshInterval: refreshInterval,
// defaultDisabledCfg disables metric collection and contains default config values.
func defaultDisabledCfg() *dynamicConfig {
return &dynamicConfig{
labels: map[string]string{},
filters: client.DefaultMetricFilters,
refreshInterval: defaultTelemetryConfigRefreshInterval,
endpoint: nil,
disabled: true,
}
}
t := &hcpProviderImpl{
cfg: cfg,
// NewHCPProvider initializes and starts a HCP Telemetry provider.
func NewHCPProvider(ctx context.Context, hcpClient client.Client) *hcpProviderImpl {
h := &hcpProviderImpl{
// Initialize with default config values.
cfg: defaultDisabledCfg(),
hcpClient: hcpClient,
}
go t.run(ctx, refreshInterval)
go h.run(ctx)
return t, nil
return h
}
// run continously checks for updates to the telemetry configuration by making a request to HCP.
func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
func (h *hcpProviderImpl) run(ctx context.Context) {
// Try to initialize config once before starting periodic fetch.
h.updateConfig(ctx)
ticker := time.NewTicker(h.cfg.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if newCfg := h.getUpdate(ctx); newCfg != nil {
ticker.Reset(newCfg.RefreshInterval)
if newRefreshInterval := h.updateConfig(ctx); newRefreshInterval > 0 {
ticker.Reset(newRefreshInterval)
}
case <-ctx.Done():
return
@ -91,9 +96,8 @@ func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration
}
}
// getUpdate makes a HTTP request to HCP to return a new metrics configuration
// and updates the hcpProviderImpl.
func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
// updateConfig makes a HTTP request to HCP to update metrics configuration held in the provider.
func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration {
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
@ -101,9 +105,18 @@ func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx)
if err != nil {
// Only disable metrics on 404 or 401 to handle the case of an unlinked cluster.
// For other errors such as 5XX ones, we continue metrics collection, as these are potentially transient server-side errors.
apiErr, ok := err.(*runtime.APIError)
if ok && apiErr.IsClientError() {
disabledMetricsCfg := defaultDisabledCfg()
h.modifyDynamicCfg(disabledMetricsCfg)
return disabledMetricsCfg.refreshInterval
}
logger.Error("failed to fetch telemetry config from HCP", "error", err)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
return 0
}
// newRefreshInterval of 0 or less can cause ticker Reset() panic.
@ -111,24 +124,29 @@ func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
if newRefreshInterval <= 0 {
logger.Error("invalid refresh interval duration", "refreshInterval", newRefreshInterval)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
return 0
}
newDynamicConfig := &dynamicConfig{
Filters: telemetryCfg.MetricsConfig.Filters,
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
RefreshInterval: newRefreshInterval,
newCfg := &dynamicConfig{
filters: telemetryCfg.MetricsConfig.Filters,
endpoint: telemetryCfg.MetricsConfig.Endpoint,
labels: telemetryCfg.MetricsConfig.Labels,
refreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
disabled: telemetryCfg.MetricsConfig.Disabled,
}
// Acquire write lock to update new configuration.
h.modifyDynamicCfg(newCfg)
return newCfg.refreshInterval
}
// modifyDynamicCfg acquires a write lock to update new configuration and emits a success metric.
func (h *hcpProviderImpl) modifyDynamicCfg(newCfg *dynamicConfig) {
h.rw.Lock()
h.cfg = newDynamicConfig
h.cfg = newCfg
h.rw.Unlock()
metrics.IncrCounter(internalMetricRefreshSuccess, 1)
return newDynamicConfig
}
// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
@ -136,7 +154,7 @@ func (h *hcpProviderImpl) GetEndpoint() *url.URL {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Endpoint
return h.cfg.endpoint
}
// GetFilters acquires a read lock to return filters configuration for consumers.
@ -144,7 +162,7 @@ func (h *hcpProviderImpl) GetFilters() *regexp.Regexp {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Filters
return h.cfg.filters
}
// GetLabels acquires a read lock to return labels configuration for consumers.
@ -152,5 +170,13 @@ func (h *hcpProviderImpl) GetLabels() map[string]string {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Labels
return h.cfg.labels
}
// IsDisabled acquires a read lock and return true if metrics are enabled.
func (h *hcpProviderImpl) IsDisabled() bool {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.disabled
}

208
agent/hcp/telemetry_provider_test.go

@ -2,6 +2,7 @@ package hcp
import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
@ -11,6 +12,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/go-openapi/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -36,64 +38,49 @@ type testConfig struct {
endpoint string
labels map[string]string
refreshInterval time.Duration
disabled bool
}
func TestNewTelemetryConfigProvider(t *testing.T) {
func TestNewTelemetryConfigProvider_DefaultConfig(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
testInputs *testConfig
wantErr string
}{
"success": {
testInputs: &testConfig{
refreshInterval: 1 * time.Second,
},
},
"failsWithInvalidRefreshInterval": {
testInputs: &testConfig{
refreshInterval: 0 * time.Second,
},
wantErr: "invalid refresh interval",
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize new provider, but fail all HCP fetches.
mc := client.NewMockClient(t)
mc.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, errors.New("failed to fetch config"))
testCfg, err := testTelemetryCfg(tc.testInputs)
require.NoError(t, err)
provider := NewHCPProvider(ctx, mc)
provider.updateConfig(ctx)
cfgProvider, err := NewHCPProvider(ctx, client.NewMockClient(t), testCfg)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, cfgProvider)
return
}
require.NotNil(t, cfgProvider)
})
// Assert provider has default configuration and metrics processing is disabled.
defaultCfg := &dynamicConfig{
labels: map[string]string{},
filters: client.DefaultMetricFilters,
refreshInterval: defaultTelemetryConfigRefreshInterval,
endpoint: nil,
disabled: true,
}
require.Equal(t, defaultCfg, provider.cfg)
}
func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
for name, tc := range map[string]struct {
mockExpect func(*client.MockClient)
metricKey string
optsInputs *testConfig
expected *testConfig
mockExpect func(*client.MockClient)
metricKey string
initCfg *dynamicConfig
expected *dynamicConfig
expectedInterval time.Duration
}{
"noChanges": {
optsInputs: &testConfig{
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
@ -105,25 +92,26 @@ func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeySuccess,
}),
metricKey: testMetricKeySuccess,
expectedInterval: testRefreshInterval,
},
"newConfig": {
optsInputs: &testConfig{
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
},
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
@ -135,83 +123,136 @@ func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
expected: testDynamicCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
}),
expectedInterval: 2 * time.Second,
metricKey: testMetricKeySuccess,
},
"newConfigMetricsDisabled": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
disabled: true,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
metricKey: testMetricKeySuccess,
expected: testDynamicCfg(&testConfig{
endpoint: "",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
disabled: true,
}),
metricKey: testMetricKeySuccess,
expectedInterval: 2 * time.Second,
},
"sameConfigInvalidRefreshInterval": {
optsInputs: &testConfig{
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
refreshInterval: 0 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeyFailure,
}),
metricKey: testMetricKeyFailure,
expectedInterval: 0,
},
"sameConfigHCPClientFailure": {
optsInputs: &testConfig{
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
}),
mockExpect: func(m *client.MockClient) {
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("failure"))
},
expected: &testConfig{
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
metricKey: testMetricKeyFailure,
expectedInterval: 0,
},
"disableMetrics404": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
mockExpect: func(m *client.MockClient) {
err := runtime.NewAPIError("404 failure", nil, 404)
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, err)
},
metricKey: testMetricKeyFailure,
expected: defaultDisabledCfg(),
metricKey: testMetricKeySuccess,
expectedInterval: defaultTelemetryConfigRefreshInterval,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
sink := initGlobalSink()
mockClient := client.NewMockClient(t)
tc.mockExpect(mockClient)
dynamicCfg, err := testDynamicCfg(tc.optsInputs)
require.NoError(t, err)
provider := &hcpProviderImpl{
hcpClient: mockClient,
cfg: dynamicCfg,
cfg: tc.initCfg,
}
provider.getUpdate(context.Background())
newInterval := provider.updateConfig(context.Background())
require.Equal(t, tc.expectedInterval, newInterval)
// Verify endpoint provider returns correct config values.
require.Equal(t, tc.expected.endpoint, provider.GetEndpoint().String())
require.Equal(t, tc.expected.filters, provider.GetFilters().String())
require.Equal(t, tc.expected.endpoint, provider.GetEndpoint())
require.Equal(t, tc.expected.filters, provider.GetFilters())
require.Equal(t, tc.expected.labels, provider.GetLabels())
require.Equal(t, tc.expected.disabled, provider.IsDisabled())
// Verify count for transform success metric.
interval := sink.Data()[0]
@ -289,8 +330,7 @@ func TestTelemetryConfigProvider_Race(t *testing.T) {
}
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider, err := NewHCPProvider(ctx, m, m.cfg)
require.NoError(t, err)
provider := NewHCPProvider(ctx, m)
for count := 0; count < testRaceWriteSampleCount; count++ {
// Force a TelemetryConfig value change in the mockRaceClient.
@ -298,7 +338,7 @@ func TestTelemetryConfigProvider_Race(t *testing.T) {
require.NoError(t, err)
// Force provider to obtain new client TelemetryConfig immediately.
// This call is necessary to guarantee TelemetryConfig changes to assert on expected values below.
provider.getUpdate(context.Background())
provider.updateConfig(context.Background())
// Start goroutines to access label configuration.
wg := &sync.WaitGroup{}
@ -342,22 +382,20 @@ func initGlobalSink() *metrics.InmemSink {
}
// testDynamicCfg converts testConfig inputs to a dynamicConfig to be used in tests.
func testDynamicCfg(testCfg *testConfig) (*dynamicConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
func testDynamicCfg(testCfg *testConfig) *dynamicConfig {
filters, _ := regexp.Compile(testCfg.filters)
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
var endpoint *url.URL
if testCfg.endpoint != "" {
endpoint, _ = url.Parse(testCfg.endpoint)
}
return &dynamicConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
RefreshInterval: testCfg.refreshInterval,
}, nil
endpoint: endpoint,
filters: filters,
labels: testCfg.labels,
refreshInterval: testCfg.refreshInterval,
disabled: testCfg.disabled,
}
}
// testTelemetryCfg converts testConfig inputs to a TelemetryConfig to be used in tests.
@ -367,15 +405,21 @@ func testTelemetryCfg(testCfg *testConfig) (*client.TelemetryConfig, error) {
return nil, err
}
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
var endpoint *url.URL
if testCfg.endpoint != "" {
u, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
endpoint = u
}
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
Disabled: testCfg.disabled,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testCfg.refreshInterval,

3
go.mod

@ -60,7 +60,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hcp-scada-provider v0.2.3
github.com/hashicorp/hcp-sdk-go v0.55.0
github.com/hashicorp/hcp-sdk-go v0.61.0
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
github.com/hashicorp/memberlist v0.5.0
github.com/hashicorp/raft v1.5.0
@ -230,6 +230,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
github.com/vmware/govmomi v0.18.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.mongodb.org/mongo-driver v1.11.0 // indirect
go.opencensus.io v0.24.0 // indirect

6
go.sum

@ -543,8 +543,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ=
github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo=
github.com/hashicorp/hcp-sdk-go v0.55.0 h1:T4sQtgQfQJOD0uucT4hS+GZI1FmoHAQMADj277W++xw=
github.com/hashicorp/hcp-sdk-go v0.55.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc=
github.com/hashicorp/hcp-sdk-go v0.61.0 h1:x4hJ8SlLI5WCE8Uzcu4q5jfdOEz/hFxfUkhAdoFdzSg=
github.com/hashicorp/hcp-sdk-go v0.61.0/go.mod h1:xP7wmWAmdMxs/7+ovH3jZn+MCDhHRj50Rn+m7JIY3Ck=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
@ -888,6 +888,8 @@ github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=

Loading…
Cancel
Save