mirror of https://github.com/hashicorp/consul
Co-authored-by: chappie <6537530+chapmanc@users.noreply.github.com>pull/17728/head^2
parent
aa4b01adc4
commit
404bc0f091
|
@ -984,7 +984,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
AutoEncryptIPSAN: autoEncryptIPSAN,
|
||||
AutoEncryptAllowTLS: autoEncryptAllowTLS,
|
||||
AutoConfig: autoConfig,
|
||||
Cloud: b.cloudConfigVal(c.Cloud),
|
||||
Cloud: b.cloudConfigVal(c),
|
||||
ConnectEnabled: connectEnabled,
|
||||
ConnectCAProvider: connectCAProvider,
|
||||
ConnectCAConfig: connectCAConfig,
|
||||
|
@ -2541,21 +2541,26 @@ func validateAutoConfigAuthorizer(rt RuntimeConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *builder) cloudConfigVal(v *CloudConfigRaw) hcpconfig.CloudConfig {
|
||||
func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig {
|
||||
val := hcpconfig.CloudConfig{
|
||||
ResourceID: os.Getenv("HCP_RESOURCE_ID"),
|
||||
}
|
||||
if v == nil {
|
||||
// Node id might get overriden in setup.go:142
|
||||
nodeID := stringVal(v.NodeID)
|
||||
val.NodeID = types.NodeID(nodeID)
|
||||
val.NodeName = b.nodeName(v.NodeName)
|
||||
|
||||
if v.Cloud == nil {
|
||||
return val
|
||||
}
|
||||
|
||||
val.ClientID = stringVal(v.ClientID)
|
||||
val.ClientSecret = stringVal(v.ClientSecret)
|
||||
val.AuthURL = stringVal(v.AuthURL)
|
||||
val.Hostname = stringVal(v.Hostname)
|
||||
val.ScadaAddress = stringVal(v.ScadaAddress)
|
||||
val.ClientID = stringVal(v.Cloud.ClientID)
|
||||
val.ClientSecret = stringVal(v.Cloud.ClientSecret)
|
||||
val.AuthURL = stringVal(v.Cloud.AuthURL)
|
||||
val.Hostname = stringVal(v.Cloud.Hostname)
|
||||
val.ScadaAddress = stringVal(v.Cloud.ScadaAddress)
|
||||
|
||||
if resourceID := stringVal(v.ResourceID); resourceID != "" {
|
||||
if resourceID := stringVal(v.Cloud.ResourceID); resourceID != "" {
|
||||
val.ResourceID = resourceID
|
||||
}
|
||||
return val
|
||||
|
|
|
@ -619,6 +619,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
rt.NodeName = "a"
|
||||
rt.TLS.NodeName = "a"
|
||||
rt.DataDir = dataDir
|
||||
rt.Cloud.NodeName = "a"
|
||||
},
|
||||
})
|
||||
run(t, testCase{
|
||||
|
@ -630,6 +631,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
expected: func(rt *RuntimeConfig) {
|
||||
rt.NodeID = "a"
|
||||
rt.DataDir = dataDir
|
||||
rt.Cloud.NodeID = "a"
|
||||
},
|
||||
})
|
||||
run(t, testCase{
|
||||
|
@ -2319,6 +2321,8 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
rt.Cloud = hcpconfig.CloudConfig{
|
||||
// ID is only populated from env if not populated from other sources.
|
||||
ResourceID: "env-id",
|
||||
NodeName: "thehostname",
|
||||
NodeID: "",
|
||||
}
|
||||
|
||||
// server things
|
||||
|
@ -2359,6 +2363,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
rt.Cloud = hcpconfig.CloudConfig{
|
||||
// ID is only populated from env if not populated from other sources.
|
||||
ResourceID: "file-id",
|
||||
NodeName: "thehostname",
|
||||
}
|
||||
|
||||
// server things
|
||||
|
@ -6317,6 +6322,8 @@ func TestLoad_FullConfig(t *testing.T) {
|
|||
Hostname: "DH4bh7aC",
|
||||
AuthURL: "332nCdR2",
|
||||
ScadaAddress: "aoeusth232",
|
||||
NodeID: types.NodeID("AsUIlw99"),
|
||||
NodeName: "otlLxGaI",
|
||||
},
|
||||
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
|
||||
DNSARecordLimit: 29907,
|
||||
|
|
|
@ -134,7 +134,9 @@
|
|||
"ManagementToken": "hidden",
|
||||
"ResourceID": "cluster1",
|
||||
"ScadaAddress": "",
|
||||
"TLSConfig": null
|
||||
"TLSConfig": null,
|
||||
"NodeID": "",
|
||||
"NodeName": ""
|
||||
},
|
||||
"ConfigEntryBootstrap": [],
|
||||
"ConnectCAConfig": {},
|
||||
|
|
|
@ -313,9 +313,14 @@ func (t *TelemetryConfig) Enabled() (string, bool) {
|
|||
}
|
||||
|
||||
// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
|
||||
func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string {
|
||||
labels := map[string]string{
|
||||
"node_id": nodeID, // used to delineate Consul nodes in graphs
|
||||
func (t *TelemetryConfig) DefaultLabels(cfg config.CloudConfig) map[string]string {
|
||||
labels := make(map[string]string)
|
||||
nodeID := string(cfg.NodeID)
|
||||
if nodeID != "" {
|
||||
labels["node_id"] = nodeID
|
||||
}
|
||||
if cfg.NodeName != "" {
|
||||
labels["node_name"] = cfg.NodeName
|
||||
}
|
||||
|
||||
for k, v := range t.Labels {
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
|
||||
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -147,3 +149,53 @@ func TestConvertTelemetryConfig(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_DefaultLabels(t *testing.T) {
|
||||
for name, tc := range map[string]struct {
|
||||
cfg config.CloudConfig
|
||||
expectedLabels map[string]string
|
||||
}{
|
||||
"Success": {
|
||||
cfg: config.CloudConfig{
|
||||
NodeID: types.NodeID("nodeyid"),
|
||||
NodeName: "nodey",
|
||||
},
|
||||
expectedLabels: map[string]string{
|
||||
"node_id": "nodeyid",
|
||||
"node_name": "nodey",
|
||||
},
|
||||
},
|
||||
|
||||
"NoNodeID": {
|
||||
cfg: config.CloudConfig{
|
||||
NodeID: types.NodeID(""),
|
||||
NodeName: "nodey",
|
||||
},
|
||||
expectedLabels: map[string]string{
|
||||
"node_name": "nodey",
|
||||
},
|
||||
},
|
||||
"NoNodeName": {
|
||||
cfg: config.CloudConfig{
|
||||
NodeID: types.NodeID("nodeyid"),
|
||||
NodeName: "",
|
||||
},
|
||||
expectedLabels: map[string]string{
|
||||
"node_id": "nodeyid",
|
||||
},
|
||||
},
|
||||
"Empty": {
|
||||
cfg: config.CloudConfig{
|
||||
NodeID: "",
|
||||
NodeName: "",
|
||||
},
|
||||
expectedLabels: map[string]string{},
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
tCfg := &TelemetryConfig{}
|
||||
labels := tCfg.DefaultLabels(tc.cfg)
|
||||
require.Equal(t, labels, tc.expectedLabels)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ type otlpClient struct {
|
|||
|
||||
// NewMetricsClient returns a configured MetricsClient.
|
||||
// The current implementation uses otlpClient to provide retry functionality.
|
||||
func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) {
|
||||
func NewMetricsClient(ctx context.Context, cfg CloudConfig) (MetricsClient, error) {
|
||||
if cfg == nil {
|
||||
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ func TestNewMetricsClient(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client, err := NewMetricsClient(test.cfg, test.ctx)
|
||||
client, err := NewMetricsClient(test.ctx, test.cfg)
|
||||
if test.wantErr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), test.wantErr)
|
||||
|
@ -118,7 +118,7 @@ func TestExportMetrics(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
client, err := NewMetricsClient(MockCloudCfg{}, context.Background())
|
||||
client, err := NewMetricsClient(context.Background(), MockCloudCfg{})
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
package client
|
||||
|
||||
type MockMetricsClient struct {
|
||||
MetricsClient
|
||||
}
|
|
@ -6,6 +6,7 @@ package config
|
|||
import (
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/hashicorp/consul/types"
|
||||
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
|
||||
"github.com/hashicorp/hcp-sdk-go/resource"
|
||||
)
|
||||
|
@ -25,6 +26,9 @@ type CloudConfig struct {
|
|||
|
||||
// TlsConfig for testing.
|
||||
TLSConfig *tls.Config
|
||||
|
||||
NodeID types.NodeID
|
||||
NodeName string
|
||||
}
|
||||
|
||||
func (c *CloudConfig) WithTLSConfig(cfg *tls.Config) {
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/scada"
|
||||
"github.com/hashicorp/consul/agent/hcp/telemetry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
|
@ -25,10 +24,13 @@ type Deps struct {
|
|||
Sink metrics.MetricSink
|
||||
}
|
||||
|
||||
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) {
|
||||
func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
|
||||
ctx := context.Background()
|
||||
ctx = hclog.WithContext(ctx, logger)
|
||||
|
||||
client, err := hcpclient.NewClient(cfg)
|
||||
if err != nil {
|
||||
return Deps{}, fmt.Errorf("failed to init client: %w:", err)
|
||||
return Deps{}, fmt.Errorf("failed to init client: %w", err)
|
||||
}
|
||||
|
||||
provider, err := scada.New(cfg, logger.Named("scada"))
|
||||
|
@ -36,7 +38,13 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (
|
|||
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
|
||||
}
|
||||
|
||||
sink := sink(client, &cfg, logger.Named("sink"), nodeID)
|
||||
metricsClient, err := hcpclient.NewMetricsClient(ctx, &cfg)
|
||||
if err != nil {
|
||||
logger.Error("failed to init metrics client", "error", err)
|
||||
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
|
||||
}
|
||||
|
||||
sink := sink(ctx, client, metricsClient, cfg)
|
||||
|
||||
return Deps{
|
||||
Client: client,
|
||||
|
@ -48,10 +56,13 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (
|
|||
// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
|
||||
// The sink is only initialized if the server is registered with the management plane (CCM).
|
||||
// This step should not block server initialization, so errors are logged, but not returned.
|
||||
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger, nodeID types.NodeID) metrics.MetricSink {
|
||||
ctx := context.Background()
|
||||
ctx = hclog.WithContext(ctx, logger)
|
||||
|
||||
func sink(
|
||||
ctx context.Context,
|
||||
hcpClient hcpclient.Client,
|
||||
metricsClient hcpclient.MetricsClient,
|
||||
cfg config.CloudConfig,
|
||||
) metrics.MetricSink {
|
||||
logger := hclog.FromContext(ctx).Named("sink")
|
||||
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
@ -72,16 +83,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
|
|||
return nil
|
||||
}
|
||||
|
||||
metricsClient, err := hcpclient.NewMetricsClient(cfg, ctx)
|
||||
if err != nil {
|
||||
logger.Error("failed to init metrics client", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
sinkOpts := &telemetry.OTELSinkOpts{
|
||||
Ctx: ctx,
|
||||
Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval),
|
||||
Labels: telemetryCfg.DefaultLabels(string(nodeID)),
|
||||
Labels: telemetryCfg.DefaultLabels(cfg),
|
||||
Filters: telemetryCfg.MetricsConfig.Filters,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package hcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -16,7 +17,7 @@ func TestSink(t *testing.T) {
|
|||
t.Parallel()
|
||||
for name, test := range map[string]struct {
|
||||
expect func(*client.MockClient)
|
||||
mockCloudCfg client.CloudConfig
|
||||
cloudCfg config.CloudConfig
|
||||
expectedSink bool
|
||||
}{
|
||||
"success": {
|
||||
|
@ -28,7 +29,10 @@ func TestSink(t *testing.T) {
|
|||
},
|
||||
}, nil)
|
||||
},
|
||||
mockCloudCfg: client.MockCloudCfg{},
|
||||
cloudCfg: config.CloudConfig{
|
||||
NodeID: types.NodeID("nodeyid"),
|
||||
NodeName: "nodey",
|
||||
},
|
||||
expectedSink: true,
|
||||
},
|
||||
"noSinkWhenServerNotRegisteredWithCCM": {
|
||||
|
@ -40,26 +44,13 @@ func TestSink(t *testing.T) {
|
|||
},
|
||||
}, nil)
|
||||
},
|
||||
mockCloudCfg: client.MockCloudCfg{},
|
||||
cloudCfg: config.CloudConfig{},
|
||||
},
|
||||
"noSinkWhenCCMVerificationFails": {
|
||||
expect: func(mockClient *client.MockClient) {
|
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
|
||||
},
|
||||
mockCloudCfg: client.MockCloudCfg{},
|
||||
},
|
||||
"noSinkWhenMetricsClientInitFails": {
|
||||
mockCloudCfg: client.MockCloudCfg{
|
||||
ConfigErr: fmt.Errorf("test bad hcp config"),
|
||||
},
|
||||
expect: func(mockClient *client.MockClient) {
|
||||
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
|
||||
Endpoint: "https://test.com",
|
||||
MetricsConfig: &client.MetricsConfig{
|
||||
Endpoint: "",
|
||||
},
|
||||
}, nil)
|
||||
},
|
||||
cloudCfg: config.CloudConfig{},
|
||||
},
|
||||
"failsWithFetchTelemetryFailure": {
|
||||
expect: func(mockClient *client.MockClient) {
|
||||
|
@ -93,14 +84,17 @@ func TestSink(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
c := client.NewMockClient(t)
|
||||
l := hclog.NewNullLogger()
|
||||
mc := client.MockMetricsClient{}
|
||||
|
||||
test.expect(c)
|
||||
sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234"))
|
||||
ctx := context.Background()
|
||||
|
||||
s := sink(ctx, c, mc, test.cloudCfg)
|
||||
if !test.expectedSink {
|
||||
require.Nil(t, sinkOpts)
|
||||
require.Nil(t, s)
|
||||
return
|
||||
}
|
||||
require.NotNil(t, sinkOpts)
|
||||
require.NotNil(t, s)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,7 +138,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
|
|||
|
||||
var extraSinks []metrics.MetricSink
|
||||
if cfg.IsCloudEnabled() {
|
||||
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.NodeID)
|
||||
// This values is set late within newNodeIDFromConfig above
|
||||
cfg.Cloud.NodeID = cfg.NodeID
|
||||
|
||||
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"))
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue