diff --git a/agent/consul/server.go b/agent/consul/server.go index 8ef2f5e053..2ef6aee71d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -582,11 +582,12 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, }) s.hcpManager = hcp.NewManager(hcp.ManagerConfig{ - CloudConfig: s.config.Cloud, - Client: flat.HCP.Client, - StatusFn: s.hcpServerStatus(flat), - Logger: logger.Named("hcp_manager"), - SCADAProvider: flat.HCP.Provider, + CloudConfig: s.config.Cloud, + Client: flat.HCP.Client, + StatusFn: s.hcpServerStatus(flat), + Logger: logger.Named("hcp_manager"), + SCADAProvider: flat.HCP.Provider, + TelemetryProvider: flat.HCP.TelemetryProvider, }) var recorder *middleware.RequestRecorder @@ -931,7 +932,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, go s.updateMetrics() // Now we are setup, configure the HCP manager - go s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) + go func() { + err := s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) + if err != nil { + logger.Error("error starting HCP manager, some HashiCorp Cloud Platform functionality has been disabled", + "error", err) + } + }() err = s.runEnterpriseRateLimiterConfigEntryController() if err != nil { diff --git a/agent/hcp/client/http_client.go b/agent/hcp/client/http_client.go new file mode 100644 index 0000000000..6692ed586a --- /dev/null +++ b/agent/hcp/client/http_client.go @@ -0,0 +1,56 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package client + +import ( + "crypto/tls" + "net/http" + "time" + + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-retryablehttp" + "golang.org/x/oauth2" +) + +const ( + // HTTP Client config + defaultStreamTimeout = 15 * time.Second + + // Retry config + // TODO: Eventually, we'd like to configure these values dynamically. + defaultRetryWaitMin = 1 * time.Second + defaultRetryWaitMax = 15 * time.Second + // defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible. + // This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now. + defaultRetryMax = 0 +) + +// NewHTTPClient configures the retryable HTTP client. +func NewHTTPClient(tlsCfg *tls.Config, source oauth2.TokenSource, logger hclog.Logger) *retryablehttp.Client { + tlsTransport := cleanhttp.DefaultPooledTransport() + tlsTransport.TLSClientConfig = tlsCfg + + var transport http.RoundTripper = &oauth2.Transport{ + Base: tlsTransport, + Source: source, + } + + client := &http.Client{ + Transport: transport, + Timeout: defaultStreamTimeout, + } + + retryClient := &retryablehttp.Client{ + HTTPClient: client, + Logger: logger, + RetryWaitMin: defaultRetryWaitMin, + RetryWaitMax: defaultRetryWaitMax, + RetryMax: defaultRetryMax, + CheckRetry: retryablehttp.DefaultRetryPolicy, + Backoff: retryablehttp.DefaultBackoff, + } + + return retryClient +} diff --git a/agent/hcp/client/http_client_test.go b/agent/hcp/client/http_client_test.go new file mode 100644 index 0000000000..b30329d0b1 --- /dev/null +++ b/agent/hcp/client/http_client_test.go @@ -0,0 +1,31 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package client + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestNewHTTPClient(t *testing.T) { + mockCfg := config.MockCloudCfg{} + mockHCPCfg, err := mockCfg.HCPConfig() + require.NoError(t, err) + + client := NewHTTPClient(mockHCPCfg.APITLSConfig(), mockHCPCfg, hclog.NewNullLogger()) + require.NotNil(t, client) + + var req *http.Request + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req = r + })) + _, err = client.Get(srv.URL) + require.NoError(t, err) + require.Equal(t, "Bearer test-token", req.Header.Get("Authorization")) +} diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index b3c1c6a6b3..47dd7d5280 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -6,126 +6,55 @@ package client import ( "bytes" "context" + "errors" "fmt" "io" "net/http" - "time" - "github.com/hashicorp/go-cleanhttp" - "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-retryablehttp" - hcpcfg "github.com/hashicorp/hcp-sdk-go/config" - "github.com/hashicorp/hcp-sdk-go/resource" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" - "golang.org/x/oauth2" "google.golang.org/protobuf/proto" "github.com/hashicorp/consul/agent/hcp/telemetry" - "github.com/hashicorp/consul/version" ) const ( - // HTTP Client config - defaultStreamTimeout = 15 * time.Second - - // Retry config - // TODO: Eventually, we'd like to configure these values dynamically. - defaultRetryWaitMin = 1 * time.Second - defaultRetryWaitMax = 15 * time.Second - // defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible. - // This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now. - defaultRetryMax = 0 - // defaultErrRespBodyLength refers to the max character length of the body on a failure to export metrics. // anything beyond we will truncate. defaultErrRespBodyLength = 100 ) -// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. -type CloudConfig interface { - HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) - Resource() (resource.Resource, error) +// MetricsClientProvider provides the retryable HTTP client and headers to use for exporting metrics +// by the metrics client. +type MetricsClientProvider interface { + GetHTTPClient() *retryablehttp.Client + GetHeader() http.Header } // otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. // It also holds default HTTP headers to add to export requests. type otlpClient struct { - client *retryablehttp.Client - header *http.Header + provider MetricsClientProvider } // NewMetricsClient returns a configured MetricsClient. // The current implementation uses otlpClient to provide retry functionality. -func NewMetricsClient(ctx context.Context, cfg CloudConfig) (telemetry.MetricsClient, error) { - if cfg == nil { - return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") - } - - if ctx == nil { - return nil, fmt.Errorf("failed to init telemetry client: provide a valid context") - } - - logger := hclog.FromContext(ctx) - - c, err := newHTTPClient(cfg, logger) - if err != nil { - return nil, fmt.Errorf("failed to init telemetry client: %v", err) - } - - r, err := cfg.Resource() - if err != nil { - return nil, fmt.Errorf("failed to init telemetry client: %v", err) - } - - header := make(http.Header) - header.Set("content-type", "application/x-protobuf") - header.Set("x-hcp-resource-id", r.String()) - header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) - +func NewMetricsClient(ctx context.Context, provider MetricsClientProvider) telemetry.MetricsClient { return &otlpClient{ - client: c, - header: &header, - }, nil -} - -// newHTTPClient configures the retryable HTTP client. -func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { - hcpCfg, err := cloudCfg.HCPConfig() - if err != nil { - return nil, err + provider: provider, } - - tlsTransport := cleanhttp.DefaultPooledTransport() - tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig() - - var transport http.RoundTripper = &oauth2.Transport{ - Base: tlsTransport, - Source: hcpCfg, - } - - client := &http.Client{ - Transport: transport, - Timeout: defaultStreamTimeout, - } - - retryClient := &retryablehttp.Client{ - HTTPClient: client, - Logger: logger.Named("hcp_telemetry_client"), - RetryWaitMin: defaultRetryWaitMin, - RetryWaitMax: defaultRetryWaitMax, - RetryMax: defaultRetryMax, - CheckRetry: retryablehttp.DefaultRetryPolicy, - Backoff: retryablehttp.DefaultBackoff, - } - - return retryClient, nil } // ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint. // The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config. // By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request. func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { + client := o.provider.GetHTTPClient() + if client == nil { + return errors.New("http client not configured") + } + pbRequest := &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, } @@ -139,9 +68,9 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R if err != nil { return fmt.Errorf("failed to create request: %w", err) } - req.Header = *o.header + req.Header = o.provider.GetHeader() - resp, err := o.client.Do(req.WithContext(ctx)) + resp, err := client.Do(req.WithContext(ctx)) if err != nil { return fmt.Errorf("failed to post metrics: %w", err) } diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go index 20a5f010ec..cea6efca37 100644 --- a/agent/hcp/client/metrics_client_test.go +++ b/agent/hcp/client/metrics_client_test.go @@ -5,7 +5,6 @@ package client import ( "context" - "fmt" "math/rand" "net/http" "net/http/httptest" @@ -16,55 +15,26 @@ import ( metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" "google.golang.org/protobuf/proto" - "github.com/hashicorp/consul/version" + "github.com/hashicorp/go-retryablehttp" ) -func TestNewMetricsClient(t *testing.T) { - for name, test := range map[string]struct { - wantErr string - cfg CloudConfig - ctx context.Context - }{ - "success": { - cfg: &MockCloudCfg{}, - ctx: context.Background(), - }, - "failsWithoutCloudCfg": { - wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)", - cfg: nil, - ctx: context.Background(), - }, - "failsWithoutContext": { - wantErr: "failed to init telemetry client: provide a valid context", - cfg: MockCloudCfg{}, - ctx: nil, - }, - "failsHCPConfig": { - wantErr: "failed to init telemetry client", - cfg: MockCloudCfg{ - ConfigErr: fmt.Errorf("test bad hcp config"), - }, - ctx: context.Background(), - }, - "failsBadResource": { - wantErr: "failed to init telemetry client", - cfg: MockCloudCfg{ - ResourceErr: fmt.Errorf("test bad resource"), - }, - ctx: context.Background(), - }, - } { - t.Run(name, func(t *testing.T) { - client, err := NewMetricsClient(test.ctx, test.cfg) - if test.wantErr != "" { - require.Error(t, err) - require.Contains(t, err.Error(), test.wantErr) - return - } +type mockClientProvider struct { + client *retryablehttp.Client + header *http.Header +} - require.Nil(t, err) - require.NotNil(t, client) - }) +func (m *mockClientProvider) GetHTTPClient() *retryablehttp.Client { return m.client } +func (m *mockClientProvider) GetHeader() http.Header { return m.header.Clone() } + +func newMockClientProvider() *mockClientProvider { + header := make(http.Header) + header.Set("content-type", "application/x-protobuf") + + client := retryablehttp.NewClient() + + return &mockClientProvider{ + header: &header, + client: client, } } @@ -83,6 +53,7 @@ func TestExportMetrics(t *testing.T) { wantErr string status int largeBodyError bool + mutateProvider func(*mockClientProvider) }{ "success": { status: http.StatusOK, @@ -96,14 +67,17 @@ func TestExportMetrics(t *testing.T) { wantErr: "failed to export metrics: code 400", largeBodyError: true, }, + "failsWithClientNotConfigured": { + mutateProvider: func(m *mockClientProvider) { + m.client = nil + }, + wantErr: "http client not configured", + }, } { t.Run(name, func(t *testing.T) { randomBody := randStringRunes(1000) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf") - require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) - require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion())) - require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") body := colpb.ExportMetricsServiceResponse{} bytes, err := proto.Marshal(&body) @@ -121,12 +95,15 @@ func TestExportMetrics(t *testing.T) { })) defer srv.Close() - client, err := NewMetricsClient(context.Background(), MockCloudCfg{}) - require.NoError(t, err) + provider := newMockClientProvider() + if test.mutateProvider != nil { + test.mutateProvider(provider) + } + client := NewMetricsClient(context.Background(), provider) ctx := context.Background() metrics := &metricpb.ResourceMetrics{} - err = client.ExportMetrics(ctx, metrics, srv.URL) + err := client.ExportMetrics(ctx, metrics, srv.URL) if test.wantErr != "" { require.Error(t, err) diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index 59977ef46f..e501992aad 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -11,6 +11,13 @@ import ( "github.com/hashicorp/hcp-sdk-go/resource" ) +// CloudConfigurer abstracts the cloud config methods needed to connect to HCP +// in an interface for easier testing. +type CloudConfigurer interface { + HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) + Resource() (resource.Resource, error) +} + // CloudConfig defines configuration for connecting to HCP services type CloudConfig struct { ResourceID string diff --git a/agent/hcp/client/mock_CloudConfig.go b/agent/hcp/config/mock_CloudConfig.go similarity index 98% rename from agent/hcp/client/mock_CloudConfig.go rename to agent/hcp/config/mock_CloudConfig.go index 2dc523f487..e2c6ba0c53 100644 --- a/agent/hcp/client/mock_CloudConfig.go +++ b/agent/hcp/config/mock_CloudConfig.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 -package client +package config import ( "crypto/tls" diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 145532d411..5c770f959e 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -18,9 +18,10 @@ import ( // Deps contains the interfaces that the rest of Consul core depends on for HCP integration. type Deps struct { - Client client.Client - Provider scada.Provider - Sink metrics.MetricSink + Client client.Client + Provider scada.Provider + Sink metrics.MetricSink + TelemetryProvider *hcpProviderImpl } func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) { @@ -37,22 +38,25 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) { return Deps{}, fmt.Errorf("failed to init scada: %w", err) } - metricsClient, err := client.NewMetricsClient(ctx, &cfg) + metricsProvider := NewHCPProvider(ctx) if err != nil { - logger.Error("failed to init metrics client", "error", err) - return Deps{}, fmt.Errorf("failed to init metrics client: %w", err) + logger.Error("failed to init HCP metrics provider", "error", err) + return Deps{}, fmt.Errorf("failed to init HCP metrics provider: %w", err) } - sink, err := sink(ctx, metricsClient, NewHCPProvider(ctx, hcpClient)) + metricsClient := client.NewMetricsClient(ctx, metricsProvider) + + sink, err := sink(ctx, metricsClient, metricsProvider) if err != nil { // Do not prevent server start if sink init fails, only log error. logger.Error("failed to init sink", "error", err) } return Deps{ - Client: hcpClient, - Provider: provider, - Sink: sink, + Client: hcpClient, + Provider: provider, + Sink: sink, + TelemetryProvider: metricsProvider, }, nil } diff --git a/agent/hcp/manager.go b/agent/hcp/manager.go index c5d3398499..d53e7e7695 100644 --- a/agent/hcp/manager.go +++ b/agent/hcp/manager.go @@ -21,9 +21,10 @@ var ( ) type ManagerConfig struct { - Client hcpclient.Client - CloudConfig config.CloudConfig - SCADAProvider scada.Provider + Client hcpclient.Client + CloudConfig config.CloudConfig + SCADAProvider scada.Provider + TelemetryProvider *hcpProviderImpl StatusFn StatusCallback MinInterval time.Duration @@ -66,9 +67,7 @@ type Manager struct { testUpdateSent chan struct{} } -// NewManager returns an initialized Manager with a zero configuration. It won't -// do anything until UpdateConfig is called with a config that provides -// credentials to contact HCP. +// NewManager returns a Manager initialized with the given configuration. func NewManager(cfg ManagerConfig) *Manager { return &Manager{ logger: cfg.Logger, @@ -83,23 +82,27 @@ func NewManager(cfg ManagerConfig) *Manager { // yet for servers since a config update might configure it later and // UpdateConfig called. It will effectively do nothing if there are no HCP // credentials set other than wait for some to be added. -func (m *Manager) Run(ctx context.Context) { +func (m *Manager) Run(ctx context.Context) error { var err error m.logger.Debug("HCP manager starting") // Update and start the SCADA provider err = m.startSCADAProvider() if err != nil { - // Log the error but continue starting the manager. The SCADA provider - // could potentially be updated later with a working configuration. - m.logger.Error("scada provider failed to start, some HashiCorp Cloud Platform functionality has been disabled", - "error", err) + m.logger.Error("failed to start scada provider", "error", err) + return err + } + + // Update and start the telemetry provider to enable the HCP metrics sink + if err := m.startTelemetryProvider(ctx); err != nil { + m.logger.Error("failed to update telemetry config provider", "error", err) + return err } // immediately send initial update select { case <-ctx.Done(): - return + return nil case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop err = m.sendUpdate() default: @@ -118,7 +121,7 @@ func (m *Manager) Run(ctx context.Context) { select { case <-ctx.Done(): - return + return nil case <-m.updateCh: err = m.sendUpdate() @@ -153,6 +156,18 @@ func (m *Manager) startSCADAProvider() error { if err != nil { return err } + return nil +} + +func (m *Manager) startTelemetryProvider(ctx context.Context) error { + if m.cfg.TelemetryProvider == nil { + return nil + } + + m.cfg.TelemetryProvider.Run(ctx, &HCPProviderCfg{ + HCPClient: m.cfg.Client, + HCPConfig: &m.cfg.CloudConfig, + }) return nil } diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index 2c29bc32c1..ad9d7461c1 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -38,12 +38,26 @@ func TestManager_Run(t *testing.T) { ).Return() scadaM.EXPECT().Start().Return(nil) + telemetryProvider := &hcpProviderImpl{ + httpCfg: &httpCfg{}, + logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + cfg: defaultDisabledCfg(), + } + + mockTelemetryCfg, err := testTelemetryCfg(&testConfig{ + refreshInterval: 1 * time.Second, + }) + require.NoError(t, err) + client.EXPECT().FetchTelemetryConfig(mock.Anything).Return( + mockTelemetryCfg, nil).Maybe() + mgr := NewManager(ManagerConfig{ - Client: client, - Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), - StatusFn: statusF, - CloudConfig: cloudCfg, - SCADAProvider: scadaM, + Client: client, + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + StatusFn: statusF, + CloudConfig: cloudCfg, + SCADAProvider: scadaM, + TelemetryProvider: telemetryProvider, }) mgr.testUpdateSent = updateCh ctx, cancel := context.WithCancel(context.Background()) @@ -59,6 +73,9 @@ func TestManager_Run(t *testing.T) { // Make sure after manager has stopped no more statuses are pushed. cancel() client.AssertExpectations(t) + require.Equal(t, client, telemetryProvider.hcpClient) + require.NotNil(t, telemetryProvider.GetHeader()) + require.NotNil(t, telemetryProvider.GetHTTPClient()) } func TestManager_SendUpdate(t *testing.T) { diff --git a/agent/hcp/telemetry_provider.go b/agent/hcp/telemetry_provider.go index 22bb0f2f00..34a55ebc3b 100644 --- a/agent/hcp/telemetry_provider.go +++ b/agent/hcp/telemetry_provider.go @@ -5,7 +5,11 @@ package hcp import ( "context" + "errors" + "fmt" + "net/http" "net/url" + "reflect" "regexp" "sync" "time" @@ -13,9 +17,12 @@ import ( "github.com/armon/go-metrics" "github.com/go-openapi/runtime" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-retryablehttp" "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/telemetry" + "github.com/hashicorp/consul/version" ) var ( @@ -30,19 +37,33 @@ var ( // Ensure hcpProviderImpl implements telemetry provider interfaces. var _ telemetry.ConfigProvider = &hcpProviderImpl{} var _ telemetry.EndpointProvider = &hcpProviderImpl{} +var _ client.MetricsClientProvider = &hcpProviderImpl{} // hcpProviderImpl holds telemetry configuration and settings for continuous fetch of new config from HCP. // it updates configuration, if changes are detected. type hcpProviderImpl struct { // cfg holds configuration that can be dynamically updated. cfg *dynamicConfig + // httpCfg holds configuration for the HTTP client + httpCfg *httpCfg - // A reader-writer mutex is used as the provider is read heavy. + // Reader-writer mutexes are used as the provider is read heavy. // OTEL components access telemetryConfig during metrics collection and export (read). - // Meanwhile, config is only updated when there are changes (write). - rw sync.RWMutex + // Meanwhile, configs are only updated when there are changes (write). + rw sync.RWMutex + httpCfgRW sync.RWMutex + + // running indicates if the HCP telemetry config provider has been started + running bool + // hcpClient is an authenticated client used to make HTTP requests to HCP. hcpClient client.Client + + // logger is the HCP logger for the provider + logger hclog.Logger + + // testUpdateConfigCh is used by unit tests to signal when an update config has occurred + testUpdateConfigCh chan struct{} } // dynamicConfig is a set of configurable settings for metrics collection, processing and export. @@ -67,21 +88,56 @@ func defaultDisabledCfg() *dynamicConfig { } } +// httpCfg is a set of configurable settings for the HTTP client used to export metrics +type httpCfg struct { + header *http.Header + client *retryablehttp.Client +} + +type HCPProviderCfg struct { + HCPClient client.Client + HCPConfig config.CloudConfigurer +} + // NewHCPProvider initializes and starts a HCP Telemetry provider. -func NewHCPProvider(ctx context.Context, hcpClient client.Client) *hcpProviderImpl { +func NewHCPProvider(ctx context.Context) *hcpProviderImpl { h := &hcpProviderImpl{ // Initialize with default config values. - cfg: defaultDisabledCfg(), - hcpClient: hcpClient, + cfg: defaultDisabledCfg(), + httpCfg: &httpCfg{}, + logger: hclog.FromContext(ctx), } - go h.run(ctx) - return h } -// run continously checks for updates to the telemetry configuration by making a request to HCP. -func (h *hcpProviderImpl) run(ctx context.Context) { +// Run starts a process that continuously checks for updates to the telemetry configuration +// by making a request to HCP. It only starts running if it's not already running. +func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error { + if h.isRunning() { + return nil + } + + h.rw.Lock() + h.running = true + h.rw.Unlock() + + // Update the provider with the HCP configurations + h.hcpClient = c.HCPClient + err := h.updateHTTPConfig(c.HCPConfig) + if err != nil { + return fmt.Errorf("failed to initialize HCP telemetry provider: %v", err) + } + + go h.run(ctx) + + return nil +} + +// run continuously checks for updates to the telemetry configuration by making a request to HCP. +func (h *hcpProviderImpl) run(ctx context.Context) error { + h.logger.Debug("starting telemetry config provider") + // Try to initialize config once before starting periodic fetch. h.updateConfig(ctx) @@ -94,18 +150,35 @@ func (h *hcpProviderImpl) run(ctx context.Context) { ticker.Reset(newRefreshInterval) } case <-ctx.Done(): - return + return nil } } } // updateConfig makes a HTTP request to HCP to update metrics configuration held in the provider. func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration { - logger := hclog.FromContext(ctx).Named("telemetry_config_provider") + logger := h.logger.Named("telemetry_config_provider") + + if h.testUpdateConfigCh != nil { + defer func() { + select { + case h.testUpdateConfigCh <- struct{}{}: + default: + } + }() + } + + if h.hcpClient == nil || reflect.ValueOf(h.hcpClient).IsNil() { + // Disable metrics if HCP client is not configured + disabledMetricsCfg := defaultDisabledCfg() + h.modifyDynamicCfg(disabledMetricsCfg) + return disabledMetricsCfg.refreshInterval + } ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + logger.Trace("fetching telemetry config") telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx) if err != nil { // Only disable metrics on 404 or 401 to handle the case of an unlinked cluster. @@ -121,6 +194,7 @@ func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration { metrics.IncrCounter(internalMetricRefreshFailure, 1) return 0 } + logger.Trace("successfully fetched telemetry config") // newRefreshInterval of 0 or less can cause ticker Reset() panic. newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval @@ -183,3 +257,66 @@ func (h *hcpProviderImpl) IsDisabled() bool { return h.cfg.disabled } + +// updateHTTPConfig updates the HTTP configuration values that rely on the HCP configuration. +func (h *hcpProviderImpl) updateHTTPConfig(cfg config.CloudConfigurer) error { + h.httpCfgRW.Lock() + defer h.httpCfgRW.Unlock() + + if cfg == nil { + return errors.New("must provide valid HCP configuration") + } + + // Update headers + r, err := cfg.Resource() + if err != nil { + return fmt.Errorf("failed set telemetry client headers: %v", err) + } + header := make(http.Header) + header.Set("content-type", "application/x-protobuf") + header.Set("x-hcp-resource-id", r.String()) + header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) + h.httpCfg.header = &header + + // Update HTTP client + hcpCfg, err := cfg.HCPConfig() + if err != nil { + return fmt.Errorf("failed to configure telemetry HTTP client: %v", err) + } + h.httpCfg.client = client.NewHTTPClient( + hcpCfg.APITLSConfig(), + hcpCfg, + h.logger.Named("hcp_telemetry_client")) + + return nil +} + +// GetHeader acquires a read lock to return the HTTP request headers needed +// to export metrics. +func (h *hcpProviderImpl) GetHeader() http.Header { + h.httpCfgRW.RLock() + defer h.httpCfgRW.RUnlock() + + if h.httpCfg.header == nil { + return nil + } + + return h.httpCfg.header.Clone() +} + +// GetHTTPClient acquires a read lock to return the retryable HTTP client needed +// to export metrics. +func (h *hcpProviderImpl) GetHTTPClient() *retryablehttp.Client { + h.httpCfgRW.RLock() + defer h.httpCfgRW.RUnlock() + + return h.httpCfg.client +} + +// isRunning acquires a read lock to return whether the provider is running. +func (h *hcpProviderImpl) isRunning() bool { + h.rw.RLock() + defer h.rw.RUnlock() + + return h.running +} diff --git a/agent/hcp/telemetry_provider_test.go b/agent/hcp/telemetry_provider_test.go index a147183df4..03bb15b265 100644 --- a/agent/hcp/telemetry_provider_test.go +++ b/agent/hcp/telemetry_provider_test.go @@ -5,8 +5,8 @@ package hcp import ( "context" - "errors" "fmt" + "net/http" "net/url" "regexp" "strings" @@ -21,6 +21,9 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/consul/version" + "github.com/hashicorp/go-hclog" ) const ( @@ -49,11 +52,8 @@ func TestNewTelemetryConfigProvider_DefaultConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Initialize new provider, but fail all HCP fetches. - mc := client.NewMockClient(t) - mc.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, errors.New("failed to fetch config")) - - provider := NewHCPProvider(ctx, mc) + // Initialize new provider + provider := NewHCPProvider(ctx) provider.updateConfig(ctx) // Assert provider has default configuration and metrics processing is disabled. @@ -74,6 +74,7 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) { initCfg *dynamicConfig expected *dynamicConfig expectedInterval time.Duration + skipHCPClient bool }{ "noChanges": { initCfg: testDynamicCfg(&testConfig{ @@ -236,16 +237,34 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) { metricKey: testMetricKeySuccess, expectedInterval: defaultTelemetryConfigRefreshInterval, }, + "hcpClientNotConfigured": { + skipHCPClient: true, + initCfg: testDynamicCfg(&testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }), + expected: defaultDisabledCfg(), + metricKey: testMetricKeySuccess, + expectedInterval: defaultTelemetryConfigRefreshInterval, + }, } { tc := tc t.Run(name, func(t *testing.T) { sink := initGlobalSink() - mockClient := client.NewMockClient(t) - tc.mockExpect(mockClient) + var mockClient *client.MockClient + if !tc.skipHCPClient { + mockClient = client.NewMockClient(t) + tc.mockExpect(mockClient) + } provider := &hcpProviderImpl{ hcpClient: mockClient, cfg: tc.initCfg, + logger: hclog.NewNullLogger(), } newInterval := provider.updateConfig(context.Background()) @@ -267,6 +286,98 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) { } } +func TestTelemetryConfigProvider_Run(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := NewHCPProvider(ctx) + + testUpdateConfigCh := make(chan struct{}, 1) + provider.testUpdateConfigCh = testUpdateConfigCh + + // Configure mocks + mockClient := client.NewMockClient(t) + mTelemetryCfg, err := testTelemetryCfg(&testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }) + require.NoError(t, err) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil) + mockHCPCfg := &config.MockCloudCfg{} + + // Run provider + go provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + var count int + select { + case <-testUpdateConfigCh: + // Expect/wait for at least two update config calls + count++ + if count > 2 { + break + } + case <-time.After(time.Second): + require.Fail(t, "provider did not attempt to update config in expected time") + } + + mockClient.AssertExpectations(t) +} + +func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + cfg config.CloudConfigurer + }{ + "success": { + cfg: &config.MockCloudCfg{}, + }, + "failsWithoutCloudCfg": { + wantErr: "must provide valid HCP configuration", + cfg: nil, + }, + "failsHCPConfig": { + wantErr: "failed to configure telemetry HTTP client", + cfg: config.MockCloudCfg{ + ConfigErr: fmt.Errorf("test bad hcp config"), + }, + }, + "failsBadResource": { + wantErr: "failed set telemetry client headers", + cfg: config.MockCloudCfg{ + ResourceErr: fmt.Errorf("test bad resource"), + }, + }, + } { + t.Run(name, func(t *testing.T) { + provider := NewHCPProvider(context.Background()) + err := provider.updateHTTPConfig(test.cfg) + + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NoError(t, err) + require.NotNil(t, provider.GetHTTPClient()) + + expectedHeader := make(http.Header) + expectedHeader.Set("content-type", "application/x-protobuf") + expectedHeader.Set("x-hcp-resource-id", "organization/test-org/project/test-project/test-type/test-id") + expectedHeader.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) + require.Equal(t, expectedHeader, provider.GetHeader()) + }) + } +} + // mockRaceClient is a mock HCP client that fetches TelemetryConfig. // The mock TelemetryConfig returned can be manually updated at any time. // It manages concurrent read/write access to config with a sync.RWMutex. @@ -335,7 +446,9 @@ func TestTelemetryConfigProvider_Race(t *testing.T) { } // Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval. - provider := NewHCPProvider(ctx, m) + provider := NewHCPProvider(ctx) + err = provider.Run(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}}) + require.NoError(t, err) for count := 0; count < testRaceWriteSampleCount; count++ { // Force a TelemetryConfig value change in the mockRaceClient.