mirror of https://github.com/hashicorp/consul
[CC-7042] Update and enable the HCP metrics sink in the HCP manager (#20072)
* Option to set HCP client at runtime Allows us to initially set a nil HCP client for the telemetry provider and update it later. * Set telemetry provider HCP client in HCP manager Set the telemetry provider as a dependency and pass it to the manager. Update the telemetry provider's HCP client when the HCP manager starts. * Add a provider interface for the metrics client This provider will allow us to configure and reconfigure the retryable HTTP client and the headers for the metrics client. * Move HTTP retryable client to separate file Copied directly from the metrics client. * Abstract HCP specific values in HTTP client Remove HCP specific references and instead initiate with a generic TLS configuration and authentication source. * Set up HTTP client and headers in the provider Move setup from the metrics client to the HCP telemetry provider. * Update the telemetry provider in the HCP manager Initialize the provider without the HCP configs and then update it in the HCP manager to enable it. * Improve test assertion, fix method comment * Move client provider to metrics client * Stop the manager on setup error * Add separate lock for http configuration * Start telemetry provider in HCP manager * Update HCP client and config as part of Run * Remove option to set config at initialization * Simplify and clean up setting HCP configs * Add test for telemetry provider Run method * Fix race condition * Use clone of HTTP headers * Only allow initial update and run oncepull/20107/head^2
parent
6bcc5c148c
commit
c112a6632d
|
@ -582,11 +582,12 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
|
|||
})
|
||||
|
||||
s.hcpManager = hcp.NewManager(hcp.ManagerConfig{
|
||||
CloudConfig: s.config.Cloud,
|
||||
Client: flat.HCP.Client,
|
||||
StatusFn: s.hcpServerStatus(flat),
|
||||
Logger: logger.Named("hcp_manager"),
|
||||
SCADAProvider: flat.HCP.Provider,
|
||||
CloudConfig: s.config.Cloud,
|
||||
Client: flat.HCP.Client,
|
||||
StatusFn: s.hcpServerStatus(flat),
|
||||
Logger: logger.Named("hcp_manager"),
|
||||
SCADAProvider: flat.HCP.Provider,
|
||||
TelemetryProvider: flat.HCP.TelemetryProvider,
|
||||
})
|
||||
|
||||
var recorder *middleware.RequestRecorder
|
||||
|
@ -931,7 +932,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
|
|||
go s.updateMetrics()
|
||||
|
||||
// Now we are setup, configure the HCP manager
|
||||
go s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
|
||||
go func() {
|
||||
err := s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
|
||||
if err != nil {
|
||||
logger.Error("error starting HCP manager, some HashiCorp Cloud Platform functionality has been disabled",
|
||||
"error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = s.runEnterpriseRateLimiterConfigEntryController()
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
const (
|
||||
// HTTP Client config
|
||||
defaultStreamTimeout = 15 * time.Second
|
||||
|
||||
// Retry config
|
||||
// TODO: Eventually, we'd like to configure these values dynamically.
|
||||
defaultRetryWaitMin = 1 * time.Second
|
||||
defaultRetryWaitMax = 15 * time.Second
|
||||
// defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible.
|
||||
// This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now.
|
||||
defaultRetryMax = 0
|
||||
)
|
||||
|
||||
// NewHTTPClient configures the retryable HTTP client.
|
||||
func NewHTTPClient(tlsCfg *tls.Config, source oauth2.TokenSource, logger hclog.Logger) *retryablehttp.Client {
|
||||
tlsTransport := cleanhttp.DefaultPooledTransport()
|
||||
tlsTransport.TLSClientConfig = tlsCfg
|
||||
|
||||
var transport http.RoundTripper = &oauth2.Transport{
|
||||
Base: tlsTransport,
|
||||
Source: source,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: defaultStreamTimeout,
|
||||
}
|
||||
|
||||
retryClient := &retryablehttp.Client{
|
||||
HTTPClient: client,
|
||||
Logger: logger,
|
||||
RetryWaitMin: defaultRetryWaitMin,
|
||||
RetryWaitMax: defaultRetryWaitMax,
|
||||
RetryMax: defaultRetryMax,
|
||||
CheckRetry: retryablehttp.DefaultRetryPolicy,
|
||||
Backoff: retryablehttp.DefaultBackoff,
|
||||
}
|
||||
|
||||
return retryClient
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewHTTPClient(t *testing.T) {
|
||||
mockCfg := config.MockCloudCfg{}
|
||||
mockHCPCfg, err := mockCfg.HCPConfig()
|
||||
require.NoError(t, err)
|
||||
|
||||
client := NewHTTPClient(mockHCPCfg.APITLSConfig(), mockHCPCfg, hclog.NewNullLogger())
|
||||
require.NotNil(t, client)
|
||||
|
||||
var req *http.Request
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
req = r
|
||||
}))
|
||||
_, err = client.Get(srv.URL)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Bearer test-token", req.Header.Get("Authorization"))
|
||||
}
|
|
@ -6,126 +6,55 @@ package client
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
|
||||
"github.com/hashicorp/hcp-sdk-go/resource"
|
||||
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
|
||||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
|
||||
"golang.org/x/oauth2"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/telemetry"
|
||||
"github.com/hashicorp/consul/version"
|
||||
)
|
||||
|
||||
const (
|
||||
// HTTP Client config
|
||||
defaultStreamTimeout = 15 * time.Second
|
||||
|
||||
// Retry config
|
||||
// TODO: Eventually, we'd like to configure these values dynamically.
|
||||
defaultRetryWaitMin = 1 * time.Second
|
||||
defaultRetryWaitMax = 15 * time.Second
|
||||
// defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible.
|
||||
// This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now.
|
||||
defaultRetryMax = 0
|
||||
|
||||
// defaultErrRespBodyLength refers to the max character length of the body on a failure to export metrics.
|
||||
// anything beyond we will truncate.
|
||||
defaultErrRespBodyLength = 100
|
||||
)
|
||||
|
||||
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
|
||||
type CloudConfig interface {
|
||||
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
|
||||
Resource() (resource.Resource, error)
|
||||
// MetricsClientProvider provides the retryable HTTP client and headers to use for exporting metrics
|
||||
// by the metrics client.
|
||||
type MetricsClientProvider interface {
|
||||
GetHTTPClient() *retryablehttp.Client
|
||||
GetHeader() http.Header
|
||||
}
|
||||
|
||||
// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
|
||||
// It also holds default HTTP headers to add to export requests.
|
||||
type otlpClient struct {
|
||||
client *retryablehttp.Client
|
||||
header *http.Header
|
||||
provider MetricsClientProvider
|
||||
}
|
||||
|
||||
// NewMetricsClient returns a configured MetricsClient.
|
||||
// The current implementation uses otlpClient to provide retry functionality.
|
||||
func NewMetricsClient(ctx context.Context, cfg CloudConfig) (telemetry.MetricsClient, error) {
|
||||
if cfg == nil {
|
||||
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
|
||||
}
|
||||
|
||||
if ctx == nil {
|
||||
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context")
|
||||
}
|
||||
|
||||
logger := hclog.FromContext(ctx)
|
||||
|
||||
c, err := newHTTPClient(cfg, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
|
||||
}
|
||||
|
||||
r, err := cfg.Resource()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
|
||||
}
|
||||
|
||||
header := make(http.Header)
|
||||
header.Set("content-type", "application/x-protobuf")
|
||||
header.Set("x-hcp-resource-id", r.String())
|
||||
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
|
||||
|
||||
func NewMetricsClient(ctx context.Context, provider MetricsClientProvider) telemetry.MetricsClient {
|
||||
return &otlpClient{
|
||||
client: c,
|
||||
header: &header,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// newHTTPClient configures the retryable HTTP client.
|
||||
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
|
||||
hcpCfg, err := cloudCfg.HCPConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
provider: provider,
|
||||
}
|
||||
|
||||
tlsTransport := cleanhttp.DefaultPooledTransport()
|
||||
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig()
|
||||
|
||||
var transport http.RoundTripper = &oauth2.Transport{
|
||||
Base: tlsTransport,
|
||||
Source: hcpCfg,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: defaultStreamTimeout,
|
||||
}
|
||||
|
||||
retryClient := &retryablehttp.Client{
|
||||
HTTPClient: client,
|
||||
Logger: logger.Named("hcp_telemetry_client"),
|
||||
RetryWaitMin: defaultRetryWaitMin,
|
||||
RetryWaitMax: defaultRetryWaitMax,
|
||||
RetryMax: defaultRetryMax,
|
||||
CheckRetry: retryablehttp.DefaultRetryPolicy,
|
||||
Backoff: retryablehttp.DefaultBackoff,
|
||||
}
|
||||
|
||||
return retryClient, nil
|
||||
}
|
||||
|
||||
// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
|
||||
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
|
||||
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
|
||||
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
|
||||
client := o.provider.GetHTTPClient()
|
||||
if client == nil {
|
||||
return errors.New("http client not configured")
|
||||
}
|
||||
|
||||
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
|
||||
}
|
||||
|
@ -139,9 +68,9 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header = *o.header
|
||||
req.Header = o.provider.GetHeader()
|
||||
|
||||
resp, err := o.client.Do(req.WithContext(ctx))
|
||||
resp, err := client.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to post metrics: %w", err)
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
@ -16,55 +15,26 @@ import (
|
|||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/version"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
)
|
||||
|
||||
func TestNewMetricsClient(t *testing.T) {
|
||||
for name, test := range map[string]struct {
|
||||
wantErr string
|
||||
cfg CloudConfig
|
||||
ctx context.Context
|
||||
}{
|
||||
"success": {
|
||||
cfg: &MockCloudCfg{},
|
||||
ctx: context.Background(),
|
||||
},
|
||||
"failsWithoutCloudCfg": {
|
||||
wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)",
|
||||
cfg: nil,
|
||||
ctx: context.Background(),
|
||||
},
|
||||
"failsWithoutContext": {
|
||||
wantErr: "failed to init telemetry client: provide a valid context",
|
||||
cfg: MockCloudCfg{},
|
||||
ctx: nil,
|
||||
},
|
||||
"failsHCPConfig": {
|
||||
wantErr: "failed to init telemetry client",
|
||||
cfg: MockCloudCfg{
|
||||
ConfigErr: fmt.Errorf("test bad hcp config"),
|
||||
},
|
||||
ctx: context.Background(),
|
||||
},
|
||||
"failsBadResource": {
|
||||
wantErr: "failed to init telemetry client",
|
||||
cfg: MockCloudCfg{
|
||||
ResourceErr: fmt.Errorf("test bad resource"),
|
||||
},
|
||||
ctx: context.Background(),
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client, err := NewMetricsClient(test.ctx, test.cfg)
|
||||
if test.wantErr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), test.wantErr)
|
||||
return
|
||||
}
|
||||
type mockClientProvider struct {
|
||||
client *retryablehttp.Client
|
||||
header *http.Header
|
||||
}
|
||||
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, client)
|
||||
})
|
||||
func (m *mockClientProvider) GetHTTPClient() *retryablehttp.Client { return m.client }
|
||||
func (m *mockClientProvider) GetHeader() http.Header { return m.header.Clone() }
|
||||
|
||||
func newMockClientProvider() *mockClientProvider {
|
||||
header := make(http.Header)
|
||||
header.Set("content-type", "application/x-protobuf")
|
||||
|
||||
client := retryablehttp.NewClient()
|
||||
|
||||
return &mockClientProvider{
|
||||
header: &header,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,6 +53,7 @@ func TestExportMetrics(t *testing.T) {
|
|||
wantErr string
|
||||
status int
|
||||
largeBodyError bool
|
||||
mutateProvider func(*mockClientProvider)
|
||||
}{
|
||||
"success": {
|
||||
status: http.StatusOK,
|
||||
|
@ -96,14 +67,17 @@ func TestExportMetrics(t *testing.T) {
|
|||
wantErr: "failed to export metrics: code 400",
|
||||
largeBodyError: true,
|
||||
},
|
||||
"failsWithClientNotConfigured": {
|
||||
mutateProvider: func(m *mockClientProvider) {
|
||||
m.client = nil
|
||||
},
|
||||
wantErr: "http client not configured",
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
randomBody := randStringRunes(1000)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf")
|
||||
require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID)
|
||||
require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion()))
|
||||
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")
|
||||
|
||||
body := colpb.ExportMetricsServiceResponse{}
|
||||
bytes, err := proto.Marshal(&body)
|
||||
|
@ -121,12 +95,15 @@ func TestExportMetrics(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
client, err := NewMetricsClient(context.Background(), MockCloudCfg{})
|
||||
require.NoError(t, err)
|
||||
provider := newMockClientProvider()
|
||||
if test.mutateProvider != nil {
|
||||
test.mutateProvider(provider)
|
||||
}
|
||||
client := NewMetricsClient(context.Background(), provider)
|
||||
|
||||
ctx := context.Background()
|
||||
metrics := &metricpb.ResourceMetrics{}
|
||||
err = client.ExportMetrics(ctx, metrics, srv.URL)
|
||||
err := client.ExportMetrics(ctx, metrics, srv.URL)
|
||||
|
||||
if test.wantErr != "" {
|
||||
require.Error(t, err)
|
||||
|
|
|
@ -11,6 +11,13 @@ import (
|
|||
"github.com/hashicorp/hcp-sdk-go/resource"
|
||||
)
|
||||
|
||||
// CloudConfigurer abstracts the cloud config methods needed to connect to HCP
|
||||
// in an interface for easier testing.
|
||||
type CloudConfigurer interface {
|
||||
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
|
||||
Resource() (resource.Resource, error)
|
||||
}
|
||||
|
||||
// CloudConfig defines configuration for connecting to HCP services
|
||||
type CloudConfig struct {
|
||||
ResourceID string
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package client
|
||||
package config
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
|
@ -18,9 +18,10 @@ import (
|
|||
|
||||
// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
|
||||
type Deps struct {
|
||||
Client client.Client
|
||||
Provider scada.Provider
|
||||
Sink metrics.MetricSink
|
||||
Client client.Client
|
||||
Provider scada.Provider
|
||||
Sink metrics.MetricSink
|
||||
TelemetryProvider *hcpProviderImpl
|
||||
}
|
||||
|
||||
func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
|
||||
|
@ -37,22 +38,25 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
|
|||
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
|
||||
}
|
||||
|
||||
metricsClient, err := client.NewMetricsClient(ctx, &cfg)
|
||||
metricsProvider := NewHCPProvider(ctx)
|
||||
if err != nil {
|
||||
logger.Error("failed to init metrics client", "error", err)
|
||||
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
|
||||
logger.Error("failed to init HCP metrics provider", "error", err)
|
||||
return Deps{}, fmt.Errorf("failed to init HCP metrics provider: %w", err)
|
||||
}
|
||||
|
||||
sink, err := sink(ctx, metricsClient, NewHCPProvider(ctx, hcpClient))
|
||||
metricsClient := client.NewMetricsClient(ctx, metricsProvider)
|
||||
|
||||
sink, err := sink(ctx, metricsClient, metricsProvider)
|
||||
if err != nil {
|
||||
// Do not prevent server start if sink init fails, only log error.
|
||||
logger.Error("failed to init sink", "error", err)
|
||||
}
|
||||
|
||||
return Deps{
|
||||
Client: hcpClient,
|
||||
Provider: provider,
|
||||
Sink: sink,
|
||||
Client: hcpClient,
|
||||
Provider: provider,
|
||||
Sink: sink,
|
||||
TelemetryProvider: metricsProvider,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,10 @@ var (
|
|||
)
|
||||
|
||||
type ManagerConfig struct {
|
||||
Client hcpclient.Client
|
||||
CloudConfig config.CloudConfig
|
||||
SCADAProvider scada.Provider
|
||||
Client hcpclient.Client
|
||||
CloudConfig config.CloudConfig
|
||||
SCADAProvider scada.Provider
|
||||
TelemetryProvider *hcpProviderImpl
|
||||
|
||||
StatusFn StatusCallback
|
||||
MinInterval time.Duration
|
||||
|
@ -66,9 +67,7 @@ type Manager struct {
|
|||
testUpdateSent chan struct{}
|
||||
}
|
||||
|
||||
// NewManager returns an initialized Manager with a zero configuration. It won't
|
||||
// do anything until UpdateConfig is called with a config that provides
|
||||
// credentials to contact HCP.
|
||||
// NewManager returns a Manager initialized with the given configuration.
|
||||
func NewManager(cfg ManagerConfig) *Manager {
|
||||
return &Manager{
|
||||
logger: cfg.Logger,
|
||||
|
@ -83,23 +82,27 @@ func NewManager(cfg ManagerConfig) *Manager {
|
|||
// yet for servers since a config update might configure it later and
|
||||
// UpdateConfig called. It will effectively do nothing if there are no HCP
|
||||
// credentials set other than wait for some to be added.
|
||||
func (m *Manager) Run(ctx context.Context) {
|
||||
func (m *Manager) Run(ctx context.Context) error {
|
||||
var err error
|
||||
m.logger.Debug("HCP manager starting")
|
||||
|
||||
// Update and start the SCADA provider
|
||||
err = m.startSCADAProvider()
|
||||
if err != nil {
|
||||
// Log the error but continue starting the manager. The SCADA provider
|
||||
// could potentially be updated later with a working configuration.
|
||||
m.logger.Error("scada provider failed to start, some HashiCorp Cloud Platform functionality has been disabled",
|
||||
"error", err)
|
||||
m.logger.Error("failed to start scada provider", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update and start the telemetry provider to enable the HCP metrics sink
|
||||
if err := m.startTelemetryProvider(ctx); err != nil {
|
||||
m.logger.Error("failed to update telemetry config provider", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// immediately send initial update
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
return nil
|
||||
case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop
|
||||
err = m.sendUpdate()
|
||||
default:
|
||||
|
@ -118,7 +121,7 @@ func (m *Manager) Run(ctx context.Context) {
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
return nil
|
||||
|
||||
case <-m.updateCh:
|
||||
err = m.sendUpdate()
|
||||
|
@ -153,6 +156,18 @@ func (m *Manager) startSCADAProvider() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) startTelemetryProvider(ctx context.Context) error {
|
||||
if m.cfg.TelemetryProvider == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.cfg.TelemetryProvider.Run(ctx, &HCPProviderCfg{
|
||||
HCPClient: m.cfg.Client,
|
||||
HCPConfig: &m.cfg.CloudConfig,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -38,12 +38,26 @@ func TestManager_Run(t *testing.T) {
|
|||
).Return()
|
||||
scadaM.EXPECT().Start().Return(nil)
|
||||
|
||||
telemetryProvider := &hcpProviderImpl{
|
||||
httpCfg: &httpCfg{},
|
||||
logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
cfg: defaultDisabledCfg(),
|
||||
}
|
||||
|
||||
mockTelemetryCfg, err := testTelemetryCfg(&testConfig{
|
||||
refreshInterval: 1 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
client.EXPECT().FetchTelemetryConfig(mock.Anything).Return(
|
||||
mockTelemetryCfg, nil).Maybe()
|
||||
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
CloudConfig: cloudCfg,
|
||||
SCADAProvider: scadaM,
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
CloudConfig: cloudCfg,
|
||||
SCADAProvider: scadaM,
|
||||
TelemetryProvider: telemetryProvider,
|
||||
})
|
||||
mgr.testUpdateSent = updateCh
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -59,6 +73,9 @@ func TestManager_Run(t *testing.T) {
|
|||
// Make sure after manager has stopped no more statuses are pushed.
|
||||
cancel()
|
||||
client.AssertExpectations(t)
|
||||
require.Equal(t, client, telemetryProvider.hcpClient)
|
||||
require.NotNil(t, telemetryProvider.GetHeader())
|
||||
require.NotNil(t, telemetryProvider.GetHTTPClient())
|
||||
}
|
||||
|
||||
func TestManager_SendUpdate(t *testing.T) {
|
||||
|
|
|
@ -5,7 +5,11 @@ package hcp
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -13,9 +17,12 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/go-openapi/runtime"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/telemetry"
|
||||
"github.com/hashicorp/consul/version"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -30,19 +37,33 @@ var (
|
|||
// Ensure hcpProviderImpl implements telemetry provider interfaces.
|
||||
var _ telemetry.ConfigProvider = &hcpProviderImpl{}
|
||||
var _ telemetry.EndpointProvider = &hcpProviderImpl{}
|
||||
var _ client.MetricsClientProvider = &hcpProviderImpl{}
|
||||
|
||||
// hcpProviderImpl holds telemetry configuration and settings for continuous fetch of new config from HCP.
|
||||
// it updates configuration, if changes are detected.
|
||||
type hcpProviderImpl struct {
|
||||
// cfg holds configuration that can be dynamically updated.
|
||||
cfg *dynamicConfig
|
||||
// httpCfg holds configuration for the HTTP client
|
||||
httpCfg *httpCfg
|
||||
|
||||
// A reader-writer mutex is used as the provider is read heavy.
|
||||
// Reader-writer mutexes are used as the provider is read heavy.
|
||||
// OTEL components access telemetryConfig during metrics collection and export (read).
|
||||
// Meanwhile, config is only updated when there are changes (write).
|
||||
rw sync.RWMutex
|
||||
// Meanwhile, configs are only updated when there are changes (write).
|
||||
rw sync.RWMutex
|
||||
httpCfgRW sync.RWMutex
|
||||
|
||||
// running indicates if the HCP telemetry config provider has been started
|
||||
running bool
|
||||
|
||||
// hcpClient is an authenticated client used to make HTTP requests to HCP.
|
||||
hcpClient client.Client
|
||||
|
||||
// logger is the HCP logger for the provider
|
||||
logger hclog.Logger
|
||||
|
||||
// testUpdateConfigCh is used by unit tests to signal when an update config has occurred
|
||||
testUpdateConfigCh chan struct{}
|
||||
}
|
||||
|
||||
// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
|
||||
|
@ -67,21 +88,56 @@ func defaultDisabledCfg() *dynamicConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// httpCfg is a set of configurable settings for the HTTP client used to export metrics
|
||||
type httpCfg struct {
|
||||
header *http.Header
|
||||
client *retryablehttp.Client
|
||||
}
|
||||
|
||||
type HCPProviderCfg struct {
|
||||
HCPClient client.Client
|
||||
HCPConfig config.CloudConfigurer
|
||||
}
|
||||
|
||||
// NewHCPProvider initializes and starts a HCP Telemetry provider.
|
||||
func NewHCPProvider(ctx context.Context, hcpClient client.Client) *hcpProviderImpl {
|
||||
func NewHCPProvider(ctx context.Context) *hcpProviderImpl {
|
||||
h := &hcpProviderImpl{
|
||||
// Initialize with default config values.
|
||||
cfg: defaultDisabledCfg(),
|
||||
hcpClient: hcpClient,
|
||||
cfg: defaultDisabledCfg(),
|
||||
httpCfg: &httpCfg{},
|
||||
logger: hclog.FromContext(ctx),
|
||||
}
|
||||
|
||||
go h.run(ctx)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// run continously checks for updates to the telemetry configuration by making a request to HCP.
|
||||
func (h *hcpProviderImpl) run(ctx context.Context) {
|
||||
// Run starts a process that continuously checks for updates to the telemetry configuration
|
||||
// by making a request to HCP. It only starts running if it's not already running.
|
||||
func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error {
|
||||
if h.isRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
h.rw.Lock()
|
||||
h.running = true
|
||||
h.rw.Unlock()
|
||||
|
||||
// Update the provider with the HCP configurations
|
||||
h.hcpClient = c.HCPClient
|
||||
err := h.updateHTTPConfig(c.HCPConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize HCP telemetry provider: %v", err)
|
||||
}
|
||||
|
||||
go h.run(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// run continuously checks for updates to the telemetry configuration by making a request to HCP.
|
||||
func (h *hcpProviderImpl) run(ctx context.Context) error {
|
||||
h.logger.Debug("starting telemetry config provider")
|
||||
|
||||
// Try to initialize config once before starting periodic fetch.
|
||||
h.updateConfig(ctx)
|
||||
|
||||
|
@ -94,18 +150,35 @@ func (h *hcpProviderImpl) run(ctx context.Context) {
|
|||
ticker.Reset(newRefreshInterval)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateConfig makes a HTTP request to HCP to update metrics configuration held in the provider.
|
||||
func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration {
|
||||
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")
|
||||
logger := h.logger.Named("telemetry_config_provider")
|
||||
|
||||
if h.testUpdateConfigCh != nil {
|
||||
defer func() {
|
||||
select {
|
||||
case h.testUpdateConfigCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if h.hcpClient == nil || reflect.ValueOf(h.hcpClient).IsNil() {
|
||||
// Disable metrics if HCP client is not configured
|
||||
disabledMetricsCfg := defaultDisabledCfg()
|
||||
h.modifyDynamicCfg(disabledMetricsCfg)
|
||||
return disabledMetricsCfg.refreshInterval
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
logger.Trace("fetching telemetry config")
|
||||
telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx)
|
||||
if err != nil {
|
||||
// Only disable metrics on 404 or 401 to handle the case of an unlinked cluster.
|
||||
|
@ -121,6 +194,7 @@ func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration {
|
|||
metrics.IncrCounter(internalMetricRefreshFailure, 1)
|
||||
return 0
|
||||
}
|
||||
logger.Trace("successfully fetched telemetry config")
|
||||
|
||||
// newRefreshInterval of 0 or less can cause ticker Reset() panic.
|
||||
newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
|
||||
|
@ -183,3 +257,66 @@ func (h *hcpProviderImpl) IsDisabled() bool {
|
|||
|
||||
return h.cfg.disabled
|
||||
}
|
||||
|
||||
// updateHTTPConfig updates the HTTP configuration values that rely on the HCP configuration.
|
||||
func (h *hcpProviderImpl) updateHTTPConfig(cfg config.CloudConfigurer) error {
|
||||
h.httpCfgRW.Lock()
|
||||
defer h.httpCfgRW.Unlock()
|
||||
|
||||
if cfg == nil {
|
||||
return errors.New("must provide valid HCP configuration")
|
||||
}
|
||||
|
||||
// Update headers
|
||||
r, err := cfg.Resource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed set telemetry client headers: %v", err)
|
||||
}
|
||||
header := make(http.Header)
|
||||
header.Set("content-type", "application/x-protobuf")
|
||||
header.Set("x-hcp-resource-id", r.String())
|
||||
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
|
||||
h.httpCfg.header = &header
|
||||
|
||||
// Update HTTP client
|
||||
hcpCfg, err := cfg.HCPConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to configure telemetry HTTP client: %v", err)
|
||||
}
|
||||
h.httpCfg.client = client.NewHTTPClient(
|
||||
hcpCfg.APITLSConfig(),
|
||||
hcpCfg,
|
||||
h.logger.Named("hcp_telemetry_client"))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetHeader acquires a read lock to return the HTTP request headers needed
|
||||
// to export metrics.
|
||||
func (h *hcpProviderImpl) GetHeader() http.Header {
|
||||
h.httpCfgRW.RLock()
|
||||
defer h.httpCfgRW.RUnlock()
|
||||
|
||||
if h.httpCfg.header == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return h.httpCfg.header.Clone()
|
||||
}
|
||||
|
||||
// GetHTTPClient acquires a read lock to return the retryable HTTP client needed
|
||||
// to export metrics.
|
||||
func (h *hcpProviderImpl) GetHTTPClient() *retryablehttp.Client {
|
||||
h.httpCfgRW.RLock()
|
||||
defer h.httpCfgRW.RUnlock()
|
||||
|
||||
return h.httpCfg.client
|
||||
}
|
||||
|
||||
// isRunning acquires a read lock to return whether the provider is running.
|
||||
func (h *hcpProviderImpl) isRunning() bool {
|
||||
h.rw.RLock()
|
||||
defer h.rw.RUnlock()
|
||||
|
||||
return h.running
|
||||
}
|
||||
|
|
|
@ -5,8 +5,8 @@ package hcp
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
@ -21,6 +21,9 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/version"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -49,11 +52,8 @@ func TestNewTelemetryConfigProvider_DefaultConfig(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize new provider, but fail all HCP fetches.
|
||||
mc := client.NewMockClient(t)
|
||||
mc.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, errors.New("failed to fetch config"))
|
||||
|
||||
provider := NewHCPProvider(ctx, mc)
|
||||
// Initialize new provider
|
||||
provider := NewHCPProvider(ctx)
|
||||
provider.updateConfig(ctx)
|
||||
|
||||
// Assert provider has default configuration and metrics processing is disabled.
|
||||
|
@ -74,6 +74,7 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
|
|||
initCfg *dynamicConfig
|
||||
expected *dynamicConfig
|
||||
expectedInterval time.Duration
|
||||
skipHCPClient bool
|
||||
}{
|
||||
"noChanges": {
|
||||
initCfg: testDynamicCfg(&testConfig{
|
||||
|
@ -236,16 +237,34 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
|
|||
metricKey: testMetricKeySuccess,
|
||||
expectedInterval: defaultTelemetryConfigRefreshInterval,
|
||||
},
|
||||
"hcpClientNotConfigured": {
|
||||
skipHCPClient: true,
|
||||
initCfg: testDynamicCfg(&testConfig{
|
||||
endpoint: "http://test.com/v1/metrics",
|
||||
filters: "test",
|
||||
labels: map[string]string{
|
||||
"test_label": "123",
|
||||
},
|
||||
refreshInterval: testRefreshInterval,
|
||||
}),
|
||||
expected: defaultDisabledCfg(),
|
||||
metricKey: testMetricKeySuccess,
|
||||
expectedInterval: defaultTelemetryConfigRefreshInterval,
|
||||
},
|
||||
} {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sink := initGlobalSink()
|
||||
mockClient := client.NewMockClient(t)
|
||||
tc.mockExpect(mockClient)
|
||||
var mockClient *client.MockClient
|
||||
if !tc.skipHCPClient {
|
||||
mockClient = client.NewMockClient(t)
|
||||
tc.mockExpect(mockClient)
|
||||
}
|
||||
|
||||
provider := &hcpProviderImpl{
|
||||
hcpClient: mockClient,
|
||||
cfg: tc.initCfg,
|
||||
logger: hclog.NewNullLogger(),
|
||||
}
|
||||
|
||||
newInterval := provider.updateConfig(context.Background())
|
||||
|
@ -267,6 +286,98 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTelemetryConfigProvider_Run(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
provider := NewHCPProvider(ctx)
|
||||
|
||||
testUpdateConfigCh := make(chan struct{}, 1)
|
||||
provider.testUpdateConfigCh = testUpdateConfigCh
|
||||
|
||||
// Configure mocks
|
||||
mockClient := client.NewMockClient(t)
|
||||
mTelemetryCfg, err := testTelemetryCfg(&testConfig{
|
||||
endpoint: "http://test.com/v1/metrics",
|
||||
filters: "test",
|
||||
labels: map[string]string{
|
||||
"test_label": "123",
|
||||
},
|
||||
refreshInterval: testRefreshInterval,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil)
|
||||
mockHCPCfg := &config.MockCloudCfg{}
|
||||
|
||||
// Run provider
|
||||
go provider.Run(context.Background(), &HCPProviderCfg{
|
||||
HCPClient: mockClient,
|
||||
HCPConfig: mockHCPCfg,
|
||||
})
|
||||
|
||||
var count int
|
||||
select {
|
||||
case <-testUpdateConfigCh:
|
||||
// Expect/wait for at least two update config calls
|
||||
count++
|
||||
if count > 2 {
|
||||
break
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
require.Fail(t, "provider did not attempt to update config in expected time")
|
||||
}
|
||||
|
||||
mockClient.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) {
|
||||
for name, test := range map[string]struct {
|
||||
wantErr string
|
||||
cfg config.CloudConfigurer
|
||||
}{
|
||||
"success": {
|
||||
cfg: &config.MockCloudCfg{},
|
||||
},
|
||||
"failsWithoutCloudCfg": {
|
||||
wantErr: "must provide valid HCP configuration",
|
||||
cfg: nil,
|
||||
},
|
||||
"failsHCPConfig": {
|
||||
wantErr: "failed to configure telemetry HTTP client",
|
||||
cfg: config.MockCloudCfg{
|
||||
ConfigErr: fmt.Errorf("test bad hcp config"),
|
||||
},
|
||||
},
|
||||
"failsBadResource": {
|
||||
wantErr: "failed set telemetry client headers",
|
||||
cfg: config.MockCloudCfg{
|
||||
ResourceErr: fmt.Errorf("test bad resource"),
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
provider := NewHCPProvider(context.Background())
|
||||
err := provider.updateHTTPConfig(test.cfg)
|
||||
|
||||
if test.wantErr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), test.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, provider.GetHTTPClient())
|
||||
|
||||
expectedHeader := make(http.Header)
|
||||
expectedHeader.Set("content-type", "application/x-protobuf")
|
||||
expectedHeader.Set("x-hcp-resource-id", "organization/test-org/project/test-project/test-type/test-id")
|
||||
expectedHeader.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
|
||||
require.Equal(t, expectedHeader, provider.GetHeader())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
|
||||
// The mock TelemetryConfig returned can be manually updated at any time.
|
||||
// It manages concurrent read/write access to config with a sync.RWMutex.
|
||||
|
@ -335,7 +446,9 @@ func TestTelemetryConfigProvider_Race(t *testing.T) {
|
|||
}
|
||||
|
||||
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
|
||||
provider := NewHCPProvider(ctx, m)
|
||||
provider := NewHCPProvider(ctx)
|
||||
err = provider.Run(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}})
|
||||
require.NoError(t, err)
|
||||
|
||||
for count := 0; count < testRaceWriteSampleCount; count++ {
|
||||
// Force a TelemetryConfig value change in the mockRaceClient.
|
||||
|
|
Loading…
Reference in New Issue