[CC-7049] Stop the HCP manager when link is deleted (#20351)

* Add Stop method to telemetry provider

Stop the main loop of the provider and set the config
to disabled.

* Add interface for telemetry provider

Added for easier testing. Also renamed Run to Start, which better
fits with Stop.

* Add Stop method to HCP manager

* Add manager interface, rename implementation

Add interface for easier testing, rename existing Manager to HCPManager.

* Stop HCP manager in link Finalizer

* Attempt to cleanup if resource has been deleted

The link should be cleaned up by the finalizer, but there's an edge
case in a multi-server setup where the link is fully deleted on one
server before the other server reconciles. This will cover the case
where the reconcile happens after the resource is deleted.

* Add a delete mananagement token function

Passes a function to the HCP manager that deletes the management token
that was initially created by the manager.

* Delete token as part of stopping the manager

* Lock around disabling config, remove descriptions
pull/20376/head^2
Melissa Kam 2024-01-30 09:40:36 -06:00 committed by GitHub
parent b9ebaeca1c
commit b0e87dbe13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 730 additions and 91 deletions

View File

@ -635,6 +635,38 @@ func (s *Server) upsertManagementToken(name, secretID string) error {
return nil
}
func (s *Server) deleteManagementToken(secretId string) error {
state := s.fsm.State()
// Fetch the token to get its accessor ID and to verify that it's a management token
_, token, err := state.ACLTokenGetBySecret(nil, secretId, nil)
if err != nil {
return fmt.Errorf("failed to get management token: %v", err)
}
if token == nil {
// token is already deleted
return nil
}
accessorID := token.AccessorID
if len(token.Policies) != 1 && token.Policies[0].ID != structs.ACLPolicyGlobalManagementID {
return fmt.Errorf("failed to delete management token: not a management token")
}
// Delete the token
req := structs.ACLTokenBatchDeleteRequest{
TokenIDs: []string{accessorID},
}
if _, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req); err != nil {
return fmt.Errorf("failed to delete management token: %v", err)
}
s.logger.Info("deleted ACL token", "description", token.Description)
return nil
}
func (s *Server) insertAnonymousToken() error {
state := s.fsm.State()
_, token, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil)

View File

@ -457,7 +457,7 @@ type Server struct {
xdsCapacityController *xdscapacity.Controller
// hcpManager handles pushing server status updates to the HashiCorp Cloud Platform when enabled
hcpManager *hcp.Manager
hcpManager *hcp.HCPManager
// embedded struct to hold all the enterprise specific data
EnterpriseServer
@ -611,6 +611,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
}
return nil
},
ManagementTokenDeleterFn: func(secretId string) error {
// Check the state of the server before attempting to delete the token.Otherwise,
// the delete will fail and log errors that do not require action from the user.
if s.config.ACLsEnabled && s.IsLeader() && s.InPrimaryDatacenter() {
return s.deleteManagementToken(secretId)
}
return nil
},
})
var recorder *middleware.RequestRecorder

View File

@ -2139,6 +2139,17 @@ func TestServer_hcpManager(t *testing.T) {
require.NoError(r, err)
require.NotNil(r, createdToken)
})
// Stop the HCP manager
err = s1.hcpManager.Stop()
require.NoError(t, err)
// Validate that the HCP token has been deleted as expected
retry.Run(t, func(r *retry.R) {
_, createdToken, err := s1.fsm.State().ACLTokenGetBySecret(nil, token, nil)
require.NoError(r, err)
require.Nil(r, createdToken)
})
}
func TestServer_addServerTLSInfo(t *testing.T) {

View File

@ -5,6 +5,7 @@ package hcp
import (
"context"
"reflect"
"sync"
"time"
@ -20,16 +21,19 @@ var (
defaultManagerMaxInterval = 75 * time.Minute
)
var _ Manager = (*HCPManager)(nil)
type ManagerConfig struct {
Client hcpclient.Client
CloudConfig config.CloudConfig
SCADAProvider scada.Provider
TelemetryProvider *hcpProviderImpl
TelemetryProvider TelemetryProvider
StatusFn StatusCallback
// Idempotent function to upsert the HCP management token. This will be called periodically in
// the manager's main loop.
ManagementTokenUpserterFn ManagementTokenUpserter
ManagementTokenDeleterFn ManagementTokenDeleter
MinInterval time.Duration
MaxInterval time.Duration
@ -58,8 +62,17 @@ func (cfg *ManagerConfig) nextHeartbeat() time.Duration {
type StatusCallback func(context.Context) (hcpclient.ServerStatus, error)
type ManagementTokenUpserter func(name, secretId string) error
type ManagementTokenDeleter func(secretId string) error
type Manager struct {
//go:generate mockery --name Manager --with-expecter --inpackage
type Manager interface {
Start(context.Context) error
Stop() error
GetCloudConfig() config.CloudConfig
UpdateConfig(hcpclient.Client, config.CloudConfig)
}
type HCPManager struct {
logger hclog.Logger
running bool
@ -69,14 +82,15 @@ type Manager struct {
cfgMu sync.RWMutex
updateCh chan struct{}
stopCh chan struct{}
// testUpdateSent is set by unit tests to signal when the manager's status update has triggered
testUpdateSent chan struct{}
}
// NewManager returns a Manager initialized with the given configuration.
func NewManager(cfg ManagerConfig) *Manager {
return &Manager{
func NewManager(cfg ManagerConfig) *HCPManager {
return &HCPManager{
logger: cfg.Logger,
cfg: cfg,
@ -86,7 +100,7 @@ func NewManager(cfg ManagerConfig) *Manager {
// Start executes the logic for connecting to HCP and sending periodic server updates. If the
// manager has been previously started, it will not start again.
func (m *Manager) Start(ctx context.Context) error {
func (m *HCPManager) Start(ctx context.Context) error {
// Check if the manager has already started
changed := m.setRunning(true)
if !changed {
@ -117,6 +131,8 @@ func (m *Manager) Start(ctx context.Context) error {
case <-ctx.Done():
m.setRunning(false)
return nil
case <-m.stopCh:
return nil
case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop
err = m.sendUpdate()
if err != nil {
@ -158,6 +174,9 @@ func (m *Manager) Start(ctx context.Context) error {
m.setRunning(false)
return
case <-m.stopCh:
return
case <-m.updateCh:
err = m.sendUpdate()
@ -170,7 +189,7 @@ func (m *Manager) Start(ctx context.Context) error {
return err
}
func (m *Manager) startSCADAProvider() error {
func (m *HCPManager) startSCADAProvider() error {
provider := m.cfg.SCADAProvider
if provider == nil {
return nil
@ -197,12 +216,12 @@ func (m *Manager) startSCADAProvider() error {
return nil
}
func (m *Manager) startTelemetryProvider(ctx context.Context) error {
if m.cfg.TelemetryProvider == nil {
func (m *HCPManager) startTelemetryProvider(ctx context.Context) error {
if m.cfg.TelemetryProvider == nil || reflect.ValueOf(m.cfg.TelemetryProvider).IsNil() {
return nil
}
m.cfg.TelemetryProvider.Run(ctx, &HCPProviderCfg{
m.cfg.TelemetryProvider.Start(ctx, &HCPProviderCfg{
HCPClient: m.cfg.Client,
HCPConfig: &m.cfg.CloudConfig,
})
@ -210,14 +229,14 @@ func (m *Manager) startTelemetryProvider(ctx context.Context) error {
return nil
}
func (m *Manager) GetCloudConfig() config.CloudConfig {
func (m *HCPManager) GetCloudConfig() config.CloudConfig {
m.cfgMu.RLock()
defer m.cfgMu.RUnlock()
return m.cfg.CloudConfig
}
func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) {
func (m *HCPManager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) {
m.cfgMu.Lock()
// Save original values
originalCfg := m.cfg.CloudConfig
@ -234,7 +253,7 @@ func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudCon
}
}
func (m *Manager) SendUpdate() {
func (m *HCPManager) SendUpdate() {
m.logger.Debug("HCP triggering status update")
select {
case m.updateCh <- struct{}{}:
@ -252,7 +271,7 @@ func (m *Manager) SendUpdate() {
// and a "isRetrying" state or something so that we attempt to send update, but
// then fetch fresh info on each attempt to send so if we are already in a retry
// backoff a new push is a no-op.
func (m *Manager) sendUpdate() error {
func (m *HCPManager) sendUpdate() error {
m.cfgMu.RLock()
cfg := m.cfg
m.cfgMu.RUnlock()
@ -281,7 +300,7 @@ func (m *Manager) sendUpdate() error {
return cfg.Client.PushServerStatus(ctx, &s)
}
func (m *Manager) isRunning() bool {
func (m *HCPManager) isRunning() bool {
m.runLock.RLock()
defer m.runLock.RUnlock()
return m.running
@ -290,7 +309,7 @@ func (m *Manager) isRunning() bool {
// setRunning sets the running status of the manager to the given value. If the
// given value is the same as the current running status, it returns false. If
// current status is updated to the given status, it returns true.
func (m *Manager) setRunning(r bool) bool {
func (m *HCPManager) setRunning(r bool) bool {
m.runLock.Lock()
defer m.runLock.Unlock()
@ -298,6 +317,47 @@ func (m *Manager) setRunning(r bool) bool {
return false
}
// Initialize or close the stop channel depending what running status
// we're transitioning to. Channel must be initialized on start since
// a provider can be stopped and started multiple times.
if r {
m.stopCh = make(chan struct{})
} else {
close(m.stopCh)
}
m.running = r
return true
}
// Stop stops the manager's main loop that sends updates
// and stops the SCADA provider and telemetry provider.
func (m *HCPManager) Stop() error {
changed := m.setRunning(false)
if !changed {
m.logger.Trace("HCP manager already stopped")
return nil
}
m.logger.Info("HCP manager stopping")
m.cfgMu.RLock()
defer m.cfgMu.RUnlock()
if m.cfg.SCADAProvider != nil {
m.cfg.SCADAProvider.Stop()
}
if m.cfg.TelemetryProvider != nil && !reflect.ValueOf(m.cfg.TelemetryProvider).IsNil() {
m.cfg.TelemetryProvider.Stop()
}
if m.cfg.ManagementTokenDeleterFn != nil && m.cfg.CloudConfig.ManagementToken != "" {
err := m.cfg.ManagementTokenDeleterFn(m.cfg.CloudConfig.ManagementToken)
if err != nil {
return err
}
}
m.logger.Info("HCP manager stopped")
return nil
}

View File

@ -4,6 +4,7 @@
package hcp
import (
"fmt"
"io"
"testing"
"time"
@ -11,6 +12,7 @@ import (
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -31,7 +33,7 @@ func TestManager_Start(t *testing.T) {
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
cloudCfg := config.CloudConfig{
ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test",
ResourceID: "resource-id",
NodeID: "node-1",
ManagementToken: "fake-token",
}
@ -44,25 +46,18 @@ func TestManager_Start(t *testing.T) {
).Return().Once()
scadaM.EXPECT().Start().Return(nil)
telemetryProvider := &hcpProviderImpl{
httpCfg: &httpCfg{},
logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
cfg: defaultDisabledCfg(),
}
mockTelemetryCfg, err := testTelemetryCfg(&testConfig{
refreshInterval: 1 * time.Second,
})
require.NoError(t, err)
client.EXPECT().FetchTelemetryConfig(mock.Anything).Return(
mockTelemetryCfg, nil).Maybe()
telemetryM := NewMockTelemetryProvider(t)
telemetryM.EXPECT().Start(mock.Anything, &HCPProviderCfg{
HCPClient: client,
HCPConfig: &cloudCfg,
}).Return(nil).Once()
mgr := NewManager(ManagerConfig{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
StatusFn: statusF,
ManagementTokenUpserterFn: upsertManagementTokenF,
SCADAProvider: scadaM,
TelemetryProvider: telemetryProvider,
TelemetryProvider: telemetryM,
})
mgr.testUpdateSent = updateCh
ctx, cancel := context.WithCancel(context.Background())
@ -85,9 +80,6 @@ func TestManager_Start(t *testing.T) {
// Make sure after manager has stopped no more statuses are pushed.
cancel()
client.AssertExpectations(t)
require.Equal(t, client, telemetryProvider.hcpClient)
require.NotNil(t, telemetryProvider.GetHeader())
require.NotNil(t, telemetryProvider.GetHTTPClient())
}
func TestManager_StartMultipleTimes(t *testing.T) {
@ -270,3 +262,103 @@ func TestManager_SendUpdate_Periodic(t *testing.T) {
}
client.AssertExpectations(t)
}
func TestManager_Stop(t *testing.T) {
client := hcpclient.NewMockClient(t)
// Configure status functions called in sendUpdate
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: t.Name()}, nil
}
updateCh := make(chan struct{}, 1)
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
// Configure management token creation and cleanup
token := "test-token"
upsertManagementTokenCalled := make(chan struct{}, 1)
upsertManagementTokenF := func(name, secretID string) error {
upsertManagementTokenCalled <- struct{}{}
if secretID != token {
return fmt.Errorf("expected token %q, got %q", token, secretID)
}
return nil
}
deleteManagementTokenCalled := make(chan struct{}, 1)
deleteManagementTokenF := func(secretID string) error {
deleteManagementTokenCalled <- struct{}{}
if secretID != token {
return fmt.Errorf("expected token %q, got %q", token, secretID)
}
return nil
}
// Configure the SCADA provider
scadaM := scada.NewMockProvider(t)
scadaM.EXPECT().UpdateHCPConfig(mock.Anything).Return(nil).Once()
scadaM.EXPECT().UpdateMeta(mock.Anything).Return().Once()
scadaM.EXPECT().Start().Return(nil).Once()
scadaM.EXPECT().Stop().Return(nil).Once()
// Configure the telemetry provider
telemetryM := NewMockTelemetryProvider(t)
telemetryM.EXPECT().Start(mock.Anything, mock.Anything).Return(nil).Once()
telemetryM.EXPECT().Stop().Return().Once()
// Configure manager with all its dependencies
mgr := NewManager(ManagerConfig{
Logger: testutil.Logger(t),
StatusFn: statusF,
Client: client,
ManagementTokenUpserterFn: upsertManagementTokenF,
ManagementTokenDeleterFn: deleteManagementTokenF,
SCADAProvider: scadaM,
TelemetryProvider: telemetryM,
CloudConfig: config.CloudConfig{
ManagementToken: token,
},
})
mgr.testUpdateSent = updateCh
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Start the manager
err := mgr.Start(ctx)
require.NoError(t, err)
select {
case <-updateCh:
case <-time.After(time.Second):
require.Fail(t, "manager did not send update in expected time")
}
select {
case <-upsertManagementTokenCalled:
case <-time.After(time.Second):
require.Fail(t, "manager did not create token in expected time")
}
// Send an update to ensure the manager is running in its main loop
mgr.SendUpdate()
select {
case <-updateCh:
case <-time.After(time.Second):
require.Fail(t, "manager did not send update in expected time")
}
// Stop the manager
err = mgr.Stop()
require.NoError(t, err)
// Validate that the management token delete function is called
select {
case <-deleteManagementTokenCalled:
case <-time.After(time.Millisecond * 100):
require.Fail(t, "manager did not create token in expected time")
}
// Send an update, expect no update since manager is stopped
mgr.SendUpdate()
select {
case <-updateCh:
require.Fail(t, "manager sent update after stopped")
case <-time.After(time.Second):
}
}

209
agent/hcp/mock_Manager.go Normal file
View File

@ -0,0 +1,209 @@
// Code generated by mockery v2.38.0. DO NOT EDIT.
package hcp
import (
client "github.com/hashicorp/consul/agent/hcp/client"
config "github.com/hashicorp/consul/agent/hcp/config"
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockManager is an autogenerated mock type for the Manager type
type MockManager struct {
mock.Mock
}
type MockManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockManager) EXPECT() *MockManager_Expecter {
return &MockManager_Expecter{mock: &_m.Mock}
}
// GetCloudConfig provides a mock function with given fields:
func (_m *MockManager) GetCloudConfig() config.CloudConfig {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetCloudConfig")
}
var r0 config.CloudConfig
if rf, ok := ret.Get(0).(func() config.CloudConfig); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(config.CloudConfig)
}
return r0
}
// MockManager_GetCloudConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCloudConfig'
type MockManager_GetCloudConfig_Call struct {
*mock.Call
}
// GetCloudConfig is a helper method to define mock.On call
func (_e *MockManager_Expecter) GetCloudConfig() *MockManager_GetCloudConfig_Call {
return &MockManager_GetCloudConfig_Call{Call: _e.mock.On("GetCloudConfig")}
}
func (_c *MockManager_GetCloudConfig_Call) Run(run func()) *MockManager_GetCloudConfig_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockManager_GetCloudConfig_Call) Return(_a0 config.CloudConfig) *MockManager_GetCloudConfig_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockManager_GetCloudConfig_Call) RunAndReturn(run func() config.CloudConfig) *MockManager_GetCloudConfig_Call {
_c.Call.Return(run)
return _c
}
// Start provides a mock function with given fields: _a0
func (_m *MockManager) Start(_a0 context.Context) error {
ret := _m.Called(_a0)
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockManager_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
// - _a0 context.Context
func (_e *MockManager_Expecter) Start(_a0 interface{}) *MockManager_Start_Call {
return &MockManager_Start_Call{Call: _e.mock.On("Start", _a0)}
}
func (_c *MockManager_Start_Call) Run(run func(_a0 context.Context)) *MockManager_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockManager_Start_Call) Return(_a0 error) *MockManager_Start_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockManager_Start_Call) RunAndReturn(run func(context.Context) error) *MockManager_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockManager) Stop() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Stop")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockManager_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockManager_Expecter) Stop() *MockManager_Stop_Call {
return &MockManager_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockManager_Stop_Call) Run(run func()) *MockManager_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockManager_Stop_Call) Return(_a0 error) *MockManager_Stop_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockManager_Stop_Call) RunAndReturn(run func() error) *MockManager_Stop_Call {
_c.Call.Return(run)
return _c
}
// UpdateConfig provides a mock function with given fields: _a0, _a1
func (_m *MockManager) UpdateConfig(_a0 client.Client, _a1 config.CloudConfig) {
_m.Called(_a0, _a1)
}
// MockManager_UpdateConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateConfig'
type MockManager_UpdateConfig_Call struct {
*mock.Call
}
// UpdateConfig is a helper method to define mock.On call
// - _a0 client.Client
// - _a1 config.CloudConfig
func (_e *MockManager_Expecter) UpdateConfig(_a0 interface{}, _a1 interface{}) *MockManager_UpdateConfig_Call {
return &MockManager_UpdateConfig_Call{Call: _e.mock.On("UpdateConfig", _a0, _a1)}
}
func (_c *MockManager_UpdateConfig_Call) Run(run func(_a0 client.Client, _a1 config.CloudConfig)) *MockManager_UpdateConfig_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(client.Client), args[1].(config.CloudConfig))
})
return _c
}
func (_c *MockManager_UpdateConfig_Call) Return() *MockManager_UpdateConfig_Call {
_c.Call.Return()
return _c
}
func (_c *MockManager_UpdateConfig_Call) RunAndReturn(run func(client.Client, config.CloudConfig)) *MockManager_UpdateConfig_Call {
_c.Call.Return(run)
return _c
}
// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockManager {
mock := &MockManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,115 @@
// Code generated by mockery v2.38.0. DO NOT EDIT.
package hcp
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockTelemetryProvider is an autogenerated mock type for the TelemetryProvider type
type MockTelemetryProvider struct {
mock.Mock
}
type MockTelemetryProvider_Expecter struct {
mock *mock.Mock
}
func (_m *MockTelemetryProvider) EXPECT() *MockTelemetryProvider_Expecter {
return &MockTelemetryProvider_Expecter{mock: &_m.Mock}
}
// Start provides a mock function with given fields: ctx, c
func (_m *MockTelemetryProvider) Start(ctx context.Context, c *HCPProviderCfg) error {
ret := _m.Called(ctx, c)
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *HCPProviderCfg) error); ok {
r0 = rf(ctx, c)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockTelemetryProvider_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockTelemetryProvider_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
// - ctx context.Context
// - c *HCPProviderCfg
func (_e *MockTelemetryProvider_Expecter) Start(ctx interface{}, c interface{}) *MockTelemetryProvider_Start_Call {
return &MockTelemetryProvider_Start_Call{Call: _e.mock.On("Start", ctx, c)}
}
func (_c *MockTelemetryProvider_Start_Call) Run(run func(ctx context.Context, c *HCPProviderCfg)) *MockTelemetryProvider_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*HCPProviderCfg))
})
return _c
}
func (_c *MockTelemetryProvider_Start_Call) Return(_a0 error) *MockTelemetryProvider_Start_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTelemetryProvider_Start_Call) RunAndReturn(run func(context.Context, *HCPProviderCfg) error) *MockTelemetryProvider_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockTelemetryProvider) Stop() {
_m.Called()
}
// MockTelemetryProvider_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockTelemetryProvider_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockTelemetryProvider_Expecter) Stop() *MockTelemetryProvider_Stop_Call {
return &MockTelemetryProvider_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockTelemetryProvider_Stop_Call) Run(run func()) *MockTelemetryProvider_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockTelemetryProvider_Stop_Call) Return() *MockTelemetryProvider_Stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockTelemetryProvider_Stop_Call) RunAndReturn(run func()) *MockTelemetryProvider_Stop_Call {
_c.Call.Return(run)
return _c
}
// NewMockTelemetryProvider creates a new instance of MockTelemetryProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockTelemetryProvider(t interface {
mock.TestingT
Cleanup(func())
}) *MockTelemetryProvider {
mock := &MockTelemetryProvider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -35,6 +35,7 @@ var (
)
// Ensure hcpProviderImpl implements telemetry provider interfaces.
var _ TelemetryProvider = &hcpProviderImpl{}
var _ telemetry.ConfigProvider = &hcpProviderImpl{}
var _ telemetry.EndpointProvider = &hcpProviderImpl{}
var _ client.MetricsClientProvider = &hcpProviderImpl{}
@ -56,6 +57,9 @@ type hcpProviderImpl struct {
// running indicates if the HCP telemetry config provider has been started
running bool
// stopCh is used to signal that the telemetry config provider should stop running.
stopCh chan struct{}
// hcpClient is an authenticated client used to make HTTP requests to HCP.
hcpClient client.Client
@ -94,6 +98,12 @@ type httpCfg struct {
client *retryablehttp.Client
}
//go:generate mockery --name TelemetryProvider --with-expecter --inpackage
type TelemetryProvider interface {
Start(ctx context.Context, c *HCPProviderCfg) error
Stop()
}
type HCPProviderCfg struct {
HCPClient client.Client
HCPConfig config.CloudConfigurer
@ -111,9 +121,9 @@ func NewHCPProvider(ctx context.Context) *hcpProviderImpl {
return h
}
// Run starts a process that continuously checks for updates to the telemetry configuration
// Start starts a process that continuously checks for updates to the telemetry configuration
// by making a request to HCP. It only starts running if it's not already running.
func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error {
func (h *hcpProviderImpl) Start(ctx context.Context, c *HCPProviderCfg) error {
changed := h.setRunning(true)
if !changed {
// Provider is already running.
@ -139,7 +149,7 @@ func (h *hcpProviderImpl) run(ctx context.Context) error {
// Try to initialize config once before starting periodic fetch.
h.updateConfig(ctx)
ticker := time.NewTicker(h.cfg.refreshInterval)
ticker := time.NewTicker(h.getRefreshInterval())
defer ticker.Stop()
for {
select {
@ -149,6 +159,8 @@ func (h *hcpProviderImpl) run(ctx context.Context) error {
}
case <-ctx.Done():
return nil
case <-h.stopCh:
return nil
}
}
}
@ -224,6 +236,13 @@ func (h *hcpProviderImpl) modifyDynamicCfg(newCfg *dynamicConfig) {
metrics.IncrCounter(internalMetricRefreshSuccess, 1)
}
func (h *hcpProviderImpl) getRefreshInterval() time.Duration {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.refreshInterval
}
// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (h *hcpProviderImpl) GetEndpoint() *url.URL {
h.rw.RLock()
@ -322,7 +341,32 @@ func (h *hcpProviderImpl) setRunning(r bool) bool {
return false
}
// Initialize or close the stop channel depending what running status
// we're transitioning to. Channel must be initialized on start since
// a provider can be stopped and started multiple times.
if r {
h.stopCh = make(chan struct{})
} else {
close(h.stopCh)
}
h.running = r
return true
}
// Stop acquires a write lock to mark the provider as not running and sends a stop signal to the
// main run loop. It also updates the provider with a disabled configuration.
func (h *hcpProviderImpl) Stop() {
changed := h.setRunning(false)
if !changed {
h.logger.Trace("telemetry config provider already stopped")
return
}
h.rw.Lock()
h.cfg = defaultDisabledCfg()
h.rw.Unlock()
h.logger.Debug("telemetry config provider stopped")
}

View File

@ -286,7 +286,7 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
}
}
func TestTelemetryConfigProvider_Run(t *testing.T) {
func TestTelemetryConfigProvider_Start(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -311,22 +311,23 @@ func TestTelemetryConfigProvider_Run(t *testing.T) {
mockHCPCfg := &config.MockCloudCfg{}
// Run provider
go provider.Run(context.Background(), &HCPProviderCfg{
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
var count int
// Expect at least two update config calls to validate provider is running
// and has entered the main run loop
select {
case <-testUpdateConfigCh:
// Expect/wait for at least two update config calls
count++
if count > 2 {
break
}
case <-time.After(time.Second):
require.Fail(t, "provider did not attempt to update config in expected time")
}
select {
case <-testUpdateConfigCh:
case <-time.After(time.Millisecond * 500):
require.Fail(t, "provider did not attempt to update config in expected time")
}
mockClient.AssertExpectations(t)
}
@ -351,11 +352,11 @@ func TestTelemetryConfigProvider_MultipleRun(t *testing.T) {
mockHCPCfg := &config.MockCloudCfg{}
// Run provider twice in parallel
go provider.Run(context.Background(), &HCPProviderCfg{
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
go provider.Run(context.Background(), &HCPProviderCfg{
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
@ -374,7 +375,7 @@ func TestTelemetryConfigProvider_MultipleRun(t *testing.T) {
}
// Try calling run again, should not update again
provider.Run(context.Background(), &HCPProviderCfg{
provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
@ -435,6 +436,62 @@ func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) {
}
}
func TestTelemetryConfigProvider_Stop(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
provider := NewHCPProvider(ctx)
testUpdateConfigCh := make(chan struct{}, 1)
provider.testUpdateConfigCh = testUpdateConfigCh
// Configure mocks
mockClient := client.NewMockClient(t)
mTelemetryCfg, err := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil)
mockHCPCfg := &config.MockCloudCfg{}
// Run provider
provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
// Wait for at least two update config calls to ensure provider is running
// and has entered the main run loop
select {
case <-testUpdateConfigCh:
case <-time.After(time.Second):
require.Fail(t, "provider did not attempt to update config in expected time")
}
select {
case <-testUpdateConfigCh:
case <-time.After(time.Millisecond * 500):
require.Fail(t, "provider did not attempt to update config in expected time")
}
// Stop the provider
provider.Stop()
require.Equal(t, defaultDisabledCfg(), provider.cfg)
select {
case <-testUpdateConfigCh:
require.Fail(t, "provider should not attempt to update config after stop")
case <-time.After(time.Second):
// Success, no updates have happened after stopping
}
mockClient.AssertExpectations(t)
}
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
// The mock TelemetryConfig returned can be manually updated at any time.
// It manages concurrent read/write access to config with a sync.RWMutex.
@ -504,7 +561,7 @@ func TestTelemetryConfigProvider_Race(t *testing.T) {
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider := NewHCPProvider(ctx)
err = provider.Run(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}})
err = provider.Start(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}})
require.NoError(t, err)
for count := 0; count < testRaceWriteSampleCount; count++ {

View File

@ -44,7 +44,7 @@ func LinkController(
hcpClientFn HCPClientFn,
cfg config.CloudConfig,
dataDir string,
hcpManager *hcp.Manager,
hcpManager hcp.Manager,
) *controller.Controller {
return controller.NewController("link", pbhcp.LinkType).
// Placement is configured to each server so that the HCP manager is started
@ -70,7 +70,7 @@ type linkReconciler struct {
hcpAllowV2ResourceApis bool
hcpClientFn HCPClientFn
dataDir string
hcpManager *hcp.Manager
hcpManager hcp.Manager
}
func hcpAccessLevelToConsul(level *gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevel) pbhcp.AccessLevel {
@ -101,7 +101,7 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
switch {
case status.Code(err) == codes.NotFound:
rt.Logger.Trace("link has been deleted")
return nil
return cleanup(rt, r.hcpManager, r.dataDir)
case err != nil:
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
return err
@ -120,10 +120,17 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
}
if resource.IsMarkedForDeletion(res) {
if err = cleanup(ctx, rt, res, r.dataDir); err != nil {
if err = cleanup(rt, r.hcpManager, r.dataDir); err != nil {
rt.Logger.Error("error cleaning up link resource", "error", err)
return err
}
err := ensureDeleted(ctx, rt, res)
if err != nil {
rt.Logger.Error("error deleting link resource", "error", err)
return err
}
return nil
}

View File

@ -10,7 +10,6 @@ import (
"path/filepath"
"testing"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
"github.com/stretchr/testify/mock"
@ -108,15 +107,11 @@ func (suite *controllerSuite) TestController_Ok() {
ConsulConfig: "{}",
}, nil).Once()
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: suite.T().Name()}, nil
}
mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}).
Return(nil).Once()
hcpMgr := hcp.NewManager(hcp.ManagerConfig{
Logger: hclog.NewNullLogger(),
StatusFn: statusF,
})
hcpMgr := hcp.NewMockManager(suite.T())
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
hcpMgr.EXPECT().Stop().Return(nil)
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
suite.dataDir = dataDir
@ -173,15 +168,11 @@ func (suite *controllerSuite) TestController_Initialize() {
ResourceID: types.GenerateTestResourceID(suite.T()),
}
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: suite.T().Name()}, nil
}
mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}).
Return(nil).Once()
hcpMgr := hcp.NewManager(hcp.ManagerConfig{
Logger: hclog.NewNullLogger(),
StatusFn: statusF,
})
hcpMgr := hcp.NewMockManager(suite.T())
hcpMgr.EXPECT().GetCloudConfig().Return(cloudCfg)
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
hcpMgr.EXPECT().Stop().Return(nil)
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
suite.dataDir = dataDir
@ -225,13 +216,16 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() {
_, mockClientFunc := mockHcpClientFn(suite.T())
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
suite.dataDir = dataDir
hcpMgr := hcp.NewMockManager(suite.T())
hcpMgr.EXPECT().Stop().Return(nil)
mgr.Register(LinkController(
true,
false,
mockClientFunc,
config.CloudConfig{},
dataDir,
hcp.NewManager(hcp.ManagerConfig{}),
hcpMgr,
))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
@ -268,9 +262,11 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
suite.dataDir = dataDir
hcpMgr := hcp.NewManager(hcp.ManagerConfig{
Logger: hclog.NewNullLogger(),
})
hcpMgr := hcp.NewMockManager(suite.T())
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
hcpMgr.EXPECT().Stop().Return(nil)
mgr.Register(LinkController(
true,
@ -322,35 +318,42 @@ func (suite *controllerSuite) TestController_GetClusterError() {
suite.T().Run(name, func(t *testing.T) {
// Run the controller manager
mgr := controller.NewManager(suite.client, suite.rt.Logger)
mockClient, mockClientFunc := mockHcpClientFn(suite.T())
mockClient, mockClientFunc := mockHcpClientFn(t)
mockClient.EXPECT().GetCluster(mock.Anything).Return(nil, tc.expectErr)
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
dataDir := testutil.TempDir(t, "test-link-controller")
suite.dataDir = dataDir
hcpMgr := hcp.NewMockManager(t)
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
hcpMgr.EXPECT().Stop().Return(nil)
mgr.Register(LinkController(
true,
true,
mockClientFunc,
config.CloudConfig{},
dataDir,
hcp.NewManager(hcp.ManagerConfig{}),
hcpMgr,
))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
ctx, cancel := context.WithCancel(suite.ctx)
t.Cleanup(cancel)
go mgr.Run(ctx)
linkData := &pbhcp.Link{
ClientId: "abc",
ClientSecret: "abc",
ResourceId: types.GenerateTestResourceID(suite.T()),
ResourceId: types.GenerateTestResourceID(t),
}
link := rtest.Resource(pbhcp.LinkType, "global").
WithData(suite.T(), linkData).
Write(suite.T(), suite.client)
WithData(t, linkData).
Write(t, suite.client)
suite.T().Cleanup(suite.deleteResourceFunc(link.Id))
t.Cleanup(suite.deleteResourceFunc(link.Id))
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, tc.expectCondition)
suite.client.WaitForStatusCondition(t, link.Id, StatusKey, tc.expectCondition)
})
}
}

View File

@ -8,28 +8,29 @@ import (
"os"
"path/filepath"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/hcp/bootstrap"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
func cleanup(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, dataDir string) error {
func cleanup(rt controller.Runtime, hcpManager hcp.Manager, dataDir string) error {
rt.Logger.Trace("cleaning up link resource")
rt.Logger.Debug("stopping HCP manager")
hcpManager.Stop()
if dataDir != "" {
hcpConfigDir := filepath.Join(dataDir, bootstrap.SubDir)
rt.Logger.Debug("deleting hcp-config dir", "dir", hcpConfigDir)
err := os.RemoveAll(hcpConfigDir)
if err != nil {
rt.Logger.Error("failed to delete hcp-config dir", "dir", hcpConfigDir, "err", err)
return err
}
}
err := ensureDeleted(ctx, rt, res)
if err != nil {
return err
}
return nil
}

View File

@ -15,7 +15,7 @@ type Dependencies struct {
ResourceApisEnabled bool
HCPAllowV2ResourceApis bool
DataDir string
HCPManager *hcp.Manager
HCPManager *hcp.HCPManager
}
func Register(mgr *controller.Manager, deps Dependencies) {