diff --git a/agent/consul/server.go b/agent/consul/server.go index aef7e7a5b0..087d539c24 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -898,11 +898,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, &lib.StopChannelContext{StopCh: shutdownCh}, logger.Named("hcp-link-watcher"), pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan), - s.hcpManager, - hcpclient.NewClient, - bootstrap.LoadManagementToken, - flat.HCP.Config, - flat.HCP.DataDir, + hcp.HCPManagerLifecycleFn( + s.hcpManager, + hcpclient.NewClient, + bootstrap.LoadManagementToken, + flat.HCP.Config, + flat.HCP.DataDir, + ), ) s.controllerManager = controller.NewManager( diff --git a/agent/hcp/link_watch.go b/agent/hcp/link_watch.go index 3cd5343deb..b89ba942e4 100644 --- a/agent/hcp/link_watch.go +++ b/agent/hcp/link_watch.go @@ -15,59 +15,54 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -func StartLinkWatch( - ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, -) chan *pbresource.WatchEvent { - var watchClient pbresource.ResourceService_WatchListClient - watchListErrorBackoff := &retry.Waiter{ - MinFailures: 1, - MinWait: 1 * time.Second, +type LinkEventHandler = func(context.Context, hclog.Logger, *pbresource.WatchEvent) + +func handleLinkEvents(ctx context.Context, logger hclog.Logger, watchClient pbresource.ResourceService_WatchListClient, linkEventHandler LinkEventHandler) { + for { + select { + case <-ctx.Done(): + logger.Debug("context canceled, exiting") + return + default: + watchEvent, err := watchClient.Recv() + + if err != nil { + logger.Error("error receiving link watch event", "error", err) + return + } + + linkEventHandler(ctx, logger, watchEvent) + } + } +} + +func RunHCPLinkWatcher( + ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, linkEventHandler LinkEventHandler, +) { + errorBackoff := &retry.Waiter{ + MinFailures: 10, + MinWait: 0, MaxWait: 1 * time.Minute, } for { - var err error - watchClient, err = client.WatchList( - ctx, &pbresource.WatchListRequest{ - Type: pbhcp.LinkType, - NamePrefix: hcpctl.LinkName, - }, - ) - if err != nil { - logger.Error("failed to create watch on Link", "error", err) - watchListErrorBackoff.Wait(ctx) - continue - } - - break - } - - eventCh := make(chan *pbresource.WatchEvent) - go func() { - errorBackoff := &retry.Waiter{ - MinFailures: 1, - MinWait: 1 * time.Second, - MaxWait: 1 * time.Minute, - } - for { - select { - case <-ctx.Done(): - logger.Debug("context canceled, exiting") - close(eventCh) - return - default: - watchEvent, err := watchClient.Recv() - - if err != nil { - logger.Error("error receiving link watch event", "error", err) - errorBackoff.Wait(ctx) - continue - } - - errorBackoff.Reset() - eventCh <- watchEvent + select { + case <-ctx.Done(): + logger.Debug("context canceled, exiting") + return + default: + watchClient, err := client.WatchList( + ctx, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: hcpctl.LinkName, + }, + ) + if err != nil { + logger.Error("failed to create watch on Link", "error", err) + errorBackoff.Wait(ctx) + continue } + errorBackoff.Reset() + handleLinkEvents(ctx, logger, watchClient, linkEventHandler) } - }() - - return eventCh + } } diff --git a/agent/hcp/link_watch_test.go b/agent/hcp/link_watch_test.go index 419d11993d..22d2204a81 100644 --- a/agent/hcp/link_watch_test.go +++ b/agent/hcp/link_watch_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/hashicorp/go-hclog" @@ -21,42 +22,80 @@ import ( // This tests that when we get a watch event from the Recv call, we get that same event on the // output channel, then we -func TestLinkWatch_Ok(t *testing.T) { - testWatchEvent := &pbresource.WatchEvent{} +func TestLinkWatcher_Ok(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + testWatchEvent := &pbresource.WatchEvent{} mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) mockWatchListClient.EXPECT().Recv().Return(testWatchEvent, nil) + eventCh := make(chan *pbresource.WatchEvent) + mockLinkHandler := func(_ context.Context, _ hclog.Logger, event *pbresource.WatchEvent) { + eventCh <- event + } + client := mockpbresource.NewResourceServiceClient(t) client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ Type: pbhcp.LinkType, NamePrefix: hcpctl.LinkName, }).Return(mockWatchListClient, nil) - linkWatchCh := StartLinkWatch(context.Background(), hclog.Default(), client) - event := <-linkWatchCh - require.Equal(t, testWatchEvent, event) + go RunHCPLinkWatcher(ctx, hclog.Default(), client, mockLinkHandler) + + // Assert that the link handler is called with the testWatchEvent + receivedWatchEvent := <-eventCh + require.Equal(t, testWatchEvent, receivedWatchEvent) } -// This tests ensures that when the context is canceled, the linkWatchCh is closed -func TestLinkWatch_ContextCanceled(t *testing.T) { - mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) - // Recv may not be called if the context is cancelled before the `select` block - // within the StartLinkWatch goroutine - mockWatchListClient.EXPECT().Recv().Return(nil, errors.New("context canceled")).Maybe() - - client := mockpbresource.NewResourceServiceClient(t) - client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ - Type: pbhcp.LinkType, - NamePrefix: hcpctl.LinkName, - }).Return(mockWatchListClient, nil) - +func TestLinkWatcher_RecvError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - linkWatchCh := StartLinkWatch(ctx, hclog.Default(), client) - cancel() + // Our mock WatchListClient will simulate 5 errors, then will cancel the context. + // We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries) + // before exiting due to context cancellation. + mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) + numFailures := 5 + failures := 0 + mockWatchListClient.EXPECT().Recv().RunAndReturn(func() (*pbresource.WatchEvent, error) { + if failures < numFailures { + failures++ + return nil, errors.New("unexpectedError") + } + defer cancel() + return &pbresource.WatchEvent{}, nil + }) - // Ensure the linkWatchCh is closed - _, ok := <-linkWatchCh - require.False(t, ok) + client := mockpbresource.NewResourceServiceClient(t) + client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: hcpctl.LinkName, + }).Return(mockWatchListClient, nil).Times(numFailures + 1) + + RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {}) +} + +func TestLinkWatcher_WatchListError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // Our mock WatchList will simulate 5 errors, then will cancel the context. + // We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries) + // before exiting due to context cancellation. + numFailures := 5 + failures := 0 + + client := mockpbresource.NewResourceServiceClient(t) + client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: hcpctl.LinkName, + }).RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) { + if failures < numFailures { + failures++ + return nil, errors.New("unexpectedError") + } + defer cancel() + return mockpbresource.NewResourceService_WatchListClient(t), nil + }).Times(numFailures + 1) + + RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {}) } diff --git a/agent/hcp/link_monitor.go b/agent/hcp/manager_lifecycle.go similarity index 65% rename from agent/hcp/link_monitor.go rename to agent/hcp/manager_lifecycle.go index 0c6949c66d..6b7b6a46dc 100644 --- a/agent/hcp/link_monitor.go +++ b/agent/hcp/manager_lifecycle.go @@ -18,51 +18,19 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -// RunHCPLinkWatcher watches the HCP Link resource and kicks off a goroutine -// to manage the lifecycle of the HCP manager based on HCP Link events. -// -// StartLinkWatch will use the WatchList API in order to watch the HCP Link -// and return a channel which will contain WatchEvents. -// If there are any errors in doing so, it will continuously retry until successful. -// -// Once the channel is created, we call MonitorHCPLink which will runs continuously -// and handles starting/stopping the HCP manager. -func RunHCPLinkWatcher( - ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, m Manager, - hcpClientFn func(cfg config.CloudConfig) (hcpclient.Client, error), - loadMgmtTokenFn func( - ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string, - ) (string, error), - cloudConfig config.CloudConfig, - dataDir string, -) { - hcpLinkWatchCh := StartLinkWatch( - ctx, - logger, - client, - ) - - MonitorHCPLink(ctx, logger, m, hcpLinkWatchCh, hcpClientFn, loadMgmtTokenFn, cloudConfig, dataDir) -} - -// MonitorHCPLink monitors the status of the HCP Link and based on that, manages -// the lifecycle of the HCP Manager. It's designed to be run in its own goroutine -// for the life of a server agent. It should be run even if HCP is not configured -// yet for servers. When an HCP Link is created, it will Start the Manager and -// when an HCP Link is deleted, it will Stop the Manager. -func MonitorHCPLink( - ctx context.Context, - logger hclog.Logger, +// HCPManagerLifecycleFn returns a LinkEventHandler function which will appropriately +// Start and Stop the HCP Manager based on the Link event received. If a link is upserted, +// the HCP Manager is started, and if a link is deleted, the HCP manager is stopped. +func HCPManagerLifecycleFn( m Manager, - hcpLinkEventCh chan *pbresource.WatchEvent, hcpClientFn func(cfg config.CloudConfig) (hcpclient.Client, error), loadMgmtTokenFn func( ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string, ) (string, error), cloudConfig config.CloudConfig, dataDir string, -) { - for watchEvent := range hcpLinkEventCh { +) LinkEventHandler { + return func(ctx context.Context, logger hclog.Logger, watchEvent *pbresource.WatchEvent) { // This indicates that a Link was deleted if watchEvent.GetDelete() != nil { logger.Debug("HCP Link deleted, stopping HCP manager") @@ -80,7 +48,7 @@ func MonitorHCPLink( if err != nil { logger.Error("error stopping HCP manager", "error", err) } - continue + return } // This indicates that a Link was either created or updated @@ -91,12 +59,12 @@ func MonitorHCPLink( var link pbhcp.Link if err := res.GetData().UnmarshalTo(&link); err != nil { logger.Error("error unmarshalling link data", "error", err) - continue + return } if validated, reason := hcpctl.IsValidated(res); !validated { logger.Debug("HCP Link not validated, not starting manager", "reason", reason) - continue + return } // Update the HCP manager configuration with the link values @@ -113,7 +81,7 @@ func MonitorHCPLink( hcpClient, err := hcpClientFn(mergedCfg) if err != nil { logger.Error("error creating HCP client", "error", err) - continue + return } // Load the management token if access is set to read-write. Read-only clusters @@ -123,7 +91,7 @@ func MonitorHCPLink( token, err = loadMgmtTokenFn(ctx, logger, hcpClient, dataDir) if err != nil { logger.Error("error loading management token", "error", err) - continue + return } } diff --git a/agent/hcp/link_monitor_test.go b/agent/hcp/manager_lifecycle_test.go similarity index 92% rename from agent/hcp/link_monitor_test.go rename to agent/hcp/manager_lifecycle_test.go index 91af1cedbd..b40a772ab4 100644 --- a/agent/hcp/link_monitor_test.go +++ b/agent/hcp/manager_lifecycle_test.go @@ -9,7 +9,6 @@ import ( "io" "os" "path/filepath" - "sync" "testing" "github.com/stretchr/testify/mock" @@ -27,10 +26,12 @@ import ( "github.com/hashicorp/consul/sdk/testutil" ) -func TestMonitorHCPLink(t *testing.T) { +func TestHCPManagerLifecycleFn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) + logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard}) + mockHCPClient := hcpclient.NewMockClient(t) mockHcpClientFn := func(_ config.CloudConfig) (hcpclient.Client, error) { return mockHCPClient, nil @@ -178,8 +179,6 @@ func TestMonitorHCPLink(t *testing.T) { test.applyMocksAndAssertions(t2, mgr, &link) } - linkWatchCh := make(chan *pbresource.WatchEvent) - testHcpClientFn := mockHcpClientFn if test.hcpClientFn != nil { testHcpClientFn = test.hcpClientFn @@ -190,16 +189,10 @@ func TestMonitorHCPLink(t *testing.T) { testLoadMgmtToken = test.loadMgmtTokenFn } - // Start MonitorHCPLink - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - MonitorHCPLink( - ctx, hclog.New(&hclog.LoggerOptions{Output: io.Discard}), mgr, linkWatchCh, testHcpClientFn, - testLoadMgmtToken, existingCfg, dataDir, - ) - }() + updateManagerLifecycle := HCPManagerLifecycleFn( + mgr, testHcpClientFn, + testLoadMgmtToken, existingCfg, dataDir, + ) upsertEvent := &pbresource.WatchEvent_Upsert{ Resource: &pbresource.Resource{ @@ -219,22 +212,19 @@ func TestMonitorHCPLink(t *testing.T) { test.mutateUpsertEvent(upsertEvent) } - linkWatchCh <- &pbresource.WatchEvent{ + // Handle upsert event + updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{ Event: &pbresource.WatchEvent_Upsert_{ Upsert: upsertEvent, }, - } + }) - // Delete link, expect HCP manager to be stopped - linkWatchCh <- &pbresource.WatchEvent{ + // Handle delete event. This should stop HCP manager + updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{ Event: &pbresource.WatchEvent_Delete_{ Delete: &pbresource.WatchEvent_Delete{}, }, - } - - // Wait for MonitorHCPLink to return before assertions run - close(linkWatchCh) - wg.Wait() + }) // Ensure hcp-config directory is removed file := filepath.Join(dataDir, constants.SubDir) diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index 09af47bf35..8377379172 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -5,7 +5,6 @@ package hcp import ( "fmt" - hcpctl "github.com/hashicorp/consul/internal/hcp" "io" "testing" "time" @@ -20,17 +19,17 @@ import ( hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" + hcpctl "github.com/hashicorp/consul/internal/hcp" pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestManager_MonitorHCPLink(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) + logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard}) - linkWatchCh := make(chan *pbresource.WatchEvent) mgr := NewManager( ManagerConfig{ Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), @@ -45,9 +44,9 @@ func TestManager_MonitorHCPLink(t *testing.T) { } require.False(t, mgr.isRunning()) - go MonitorHCPLink( - ctx, hclog.New(&hclog.LoggerOptions{Output: io.Discard}), mgr, linkWatchCh, mockHcpClientFn, loadMgmtTokenFn, - config.CloudConfig{}, "", + updateManagerLifecycle := HCPManagerLifecycleFn( + mgr, mockHcpClientFn, + loadMgmtTokenFn, config.CloudConfig{}, "", ) // Set up a link @@ -59,7 +58,7 @@ func TestManager_MonitorHCPLink(t *testing.T) { } linkResource, err := anypb.New(&link) require.NoError(t, err) - linkWatchCh <- &pbresource.WatchEvent{ + updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{ Event: &pbresource.WatchEvent_Upsert_{ Upsert: &pbresource.WatchEvent_Upsert{ Resource: &pbresource.Resource{ @@ -76,14 +75,10 @@ func TestManager_MonitorHCPLink(t *testing.T) { }, }, }, - } + }) // Validate that the HCP manager is started - retry.Run( - t, func(r *retry.R) { - require.True(r, mgr.isRunning()) - }, - ) + require.True(t, mgr.isRunning()) } func TestManager_Start(t *testing.T) {