Refactor HCP link watcher to use single go-routine.

Previously, if the WatchClient errored, we would've never recovered
because we never retry to create the stream. With this change,
we have a single goroutine that runs for the life of the server agent
and if the WatchClient stream ever errors, we retry the creation
of the stream with an exponential backoff.
pull/20401/head
Nick Cellino 2024-02-09 13:53:12 -05:00
parent 5490d7586d
commit a920a1458f
6 changed files with 146 additions and 157 deletions

View File

@ -898,11 +898,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
&lib.StopChannelContext{StopCh: shutdownCh},
logger.Named("hcp-link-watcher"),
pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan),
s.hcpManager,
hcpclient.NewClient,
bootstrap.LoadManagementToken,
flat.HCP.Config,
flat.HCP.DataDir,
hcp.HCPManagerLifecycleFn(
s.hcpManager,
hcpclient.NewClient,
bootstrap.LoadManagementToken,
flat.HCP.Config,
flat.HCP.DataDir,
),
)
s.controllerManager = controller.NewManager(

View File

@ -15,59 +15,54 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
func StartLinkWatch(
ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient,
) chan *pbresource.WatchEvent {
var watchClient pbresource.ResourceService_WatchListClient
watchListErrorBackoff := &retry.Waiter{
MinFailures: 1,
MinWait: 1 * time.Second,
type LinkEventHandler = func(context.Context, hclog.Logger, *pbresource.WatchEvent)
func handleLinkEvents(ctx context.Context, logger hclog.Logger, watchClient pbresource.ResourceService_WatchListClient, linkEventHandler LinkEventHandler) {
for {
select {
case <-ctx.Done():
logger.Debug("context canceled, exiting")
return
default:
watchEvent, err := watchClient.Recv()
if err != nil {
logger.Error("error receiving link watch event", "error", err)
return
}
linkEventHandler(ctx, logger, watchEvent)
}
}
}
func RunHCPLinkWatcher(
ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, linkEventHandler LinkEventHandler,
) {
errorBackoff := &retry.Waiter{
MinFailures: 10,
MinWait: 0,
MaxWait: 1 * time.Minute,
}
for {
var err error
watchClient, err = client.WatchList(
ctx, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
},
)
if err != nil {
logger.Error("failed to create watch on Link", "error", err)
watchListErrorBackoff.Wait(ctx)
continue
}
break
}
eventCh := make(chan *pbresource.WatchEvent)
go func() {
errorBackoff := &retry.Waiter{
MinFailures: 1,
MinWait: 1 * time.Second,
MaxWait: 1 * time.Minute,
}
for {
select {
case <-ctx.Done():
logger.Debug("context canceled, exiting")
close(eventCh)
return
default:
watchEvent, err := watchClient.Recv()
if err != nil {
logger.Error("error receiving link watch event", "error", err)
errorBackoff.Wait(ctx)
continue
}
errorBackoff.Reset()
eventCh <- watchEvent
select {
case <-ctx.Done():
logger.Debug("context canceled, exiting")
return
default:
watchClient, err := client.WatchList(
ctx, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
},
)
if err != nil {
logger.Error("failed to create watch on Link", "error", err)
errorBackoff.Wait(ctx)
continue
}
errorBackoff.Reset()
handleLinkEvents(ctx, logger, watchClient, linkEventHandler)
}
}()
return eventCh
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
@ -21,42 +22,80 @@ import (
// This tests that when we get a watch event from the Recv call, we get that same event on the
// output channel, then we
func TestLinkWatch_Ok(t *testing.T) {
testWatchEvent := &pbresource.WatchEvent{}
func TestLinkWatcher_Ok(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
testWatchEvent := &pbresource.WatchEvent{}
mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t)
mockWatchListClient.EXPECT().Recv().Return(testWatchEvent, nil)
eventCh := make(chan *pbresource.WatchEvent)
mockLinkHandler := func(_ context.Context, _ hclog.Logger, event *pbresource.WatchEvent) {
eventCh <- event
}
client := mockpbresource.NewResourceServiceClient(t)
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
}).Return(mockWatchListClient, nil)
linkWatchCh := StartLinkWatch(context.Background(), hclog.Default(), client)
event := <-linkWatchCh
require.Equal(t, testWatchEvent, event)
go RunHCPLinkWatcher(ctx, hclog.Default(), client, mockLinkHandler)
// Assert that the link handler is called with the testWatchEvent
receivedWatchEvent := <-eventCh
require.Equal(t, testWatchEvent, receivedWatchEvent)
}
// This tests ensures that when the context is canceled, the linkWatchCh is closed
func TestLinkWatch_ContextCanceled(t *testing.T) {
mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t)
// Recv may not be called if the context is cancelled before the `select` block
// within the StartLinkWatch goroutine
mockWatchListClient.EXPECT().Recv().Return(nil, errors.New("context canceled")).Maybe()
client := mockpbresource.NewResourceServiceClient(t)
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
}).Return(mockWatchListClient, nil)
func TestLinkWatcher_RecvError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
linkWatchCh := StartLinkWatch(ctx, hclog.Default(), client)
cancel()
// Our mock WatchListClient will simulate 5 errors, then will cancel the context.
// We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries)
// before exiting due to context cancellation.
mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t)
numFailures := 5
failures := 0
mockWatchListClient.EXPECT().Recv().RunAndReturn(func() (*pbresource.WatchEvent, error) {
if failures < numFailures {
failures++
return nil, errors.New("unexpectedError")
}
defer cancel()
return &pbresource.WatchEvent{}, nil
})
// Ensure the linkWatchCh is closed
_, ok := <-linkWatchCh
require.False(t, ok)
client := mockpbresource.NewResourceServiceClient(t)
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
}).Return(mockWatchListClient, nil).Times(numFailures + 1)
RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {})
}
func TestLinkWatcher_WatchListError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Our mock WatchList will simulate 5 errors, then will cancel the context.
// We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries)
// before exiting due to context cancellation.
numFailures := 5
failures := 0
client := mockpbresource.NewResourceServiceClient(t)
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: pbhcp.LinkType,
NamePrefix: hcpctl.LinkName,
}).RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
if failures < numFailures {
failures++
return nil, errors.New("unexpectedError")
}
defer cancel()
return mockpbresource.NewResourceService_WatchListClient(t), nil
}).Times(numFailures + 1)
RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {})
}

View File

@ -18,51 +18,19 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
// RunHCPLinkWatcher watches the HCP Link resource and kicks off a goroutine
// to manage the lifecycle of the HCP manager based on HCP Link events.
//
// StartLinkWatch will use the WatchList API in order to watch the HCP Link
// and return a channel which will contain WatchEvents.
// If there are any errors in doing so, it will continuously retry until successful.
//
// Once the channel is created, we call MonitorHCPLink which will runs continuously
// and handles starting/stopping the HCP manager.
func RunHCPLinkWatcher(
ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, m Manager,
hcpClientFn func(cfg config.CloudConfig) (hcpclient.Client, error),
loadMgmtTokenFn func(
ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string,
) (string, error),
cloudConfig config.CloudConfig,
dataDir string,
) {
hcpLinkWatchCh := StartLinkWatch(
ctx,
logger,
client,
)
MonitorHCPLink(ctx, logger, m, hcpLinkWatchCh, hcpClientFn, loadMgmtTokenFn, cloudConfig, dataDir)
}
// MonitorHCPLink monitors the status of the HCP Link and based on that, manages
// the lifecycle of the HCP Manager. It's designed to be run in its own goroutine
// for the life of a server agent. It should be run even if HCP is not configured
// yet for servers. When an HCP Link is created, it will Start the Manager and
// when an HCP Link is deleted, it will Stop the Manager.
func MonitorHCPLink(
ctx context.Context,
logger hclog.Logger,
// HCPManagerLifecycleFn returns a LinkEventHandler function which will appropriately
// Start and Stop the HCP Manager based on the Link event received. If a link is upserted,
// the HCP Manager is started, and if a link is deleted, the HCP manager is stopped.
func HCPManagerLifecycleFn(
m Manager,
hcpLinkEventCh chan *pbresource.WatchEvent,
hcpClientFn func(cfg config.CloudConfig) (hcpclient.Client, error),
loadMgmtTokenFn func(
ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string,
) (string, error),
cloudConfig config.CloudConfig,
dataDir string,
) {
for watchEvent := range hcpLinkEventCh {
) LinkEventHandler {
return func(ctx context.Context, logger hclog.Logger, watchEvent *pbresource.WatchEvent) {
// This indicates that a Link was deleted
if watchEvent.GetDelete() != nil {
logger.Debug("HCP Link deleted, stopping HCP manager")
@ -80,7 +48,7 @@ func MonitorHCPLink(
if err != nil {
logger.Error("error stopping HCP manager", "error", err)
}
continue
return
}
// This indicates that a Link was either created or updated
@ -91,12 +59,12 @@ func MonitorHCPLink(
var link pbhcp.Link
if err := res.GetData().UnmarshalTo(&link); err != nil {
logger.Error("error unmarshalling link data", "error", err)
continue
return
}
if validated, reason := hcpctl.IsValidated(res); !validated {
logger.Debug("HCP Link not validated, not starting manager", "reason", reason)
continue
return
}
// Update the HCP manager configuration with the link values
@ -113,7 +81,7 @@ func MonitorHCPLink(
hcpClient, err := hcpClientFn(mergedCfg)
if err != nil {
logger.Error("error creating HCP client", "error", err)
continue
return
}
// Load the management token if access is set to read-write. Read-only clusters
@ -123,7 +91,7 @@ func MonitorHCPLink(
token, err = loadMgmtTokenFn(ctx, logger, hcpClient, dataDir)
if err != nil {
logger.Error("error loading management token", "error", err)
continue
return
}
}

View File

@ -9,7 +9,6 @@ import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"github.com/stretchr/testify/mock"
@ -27,10 +26,12 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
)
func TestMonitorHCPLink(t *testing.T) {
func TestHCPManagerLifecycleFn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard})
mockHCPClient := hcpclient.NewMockClient(t)
mockHcpClientFn := func(_ config.CloudConfig) (hcpclient.Client, error) {
return mockHCPClient, nil
@ -178,8 +179,6 @@ func TestMonitorHCPLink(t *testing.T) {
test.applyMocksAndAssertions(t2, mgr, &link)
}
linkWatchCh := make(chan *pbresource.WatchEvent)
testHcpClientFn := mockHcpClientFn
if test.hcpClientFn != nil {
testHcpClientFn = test.hcpClientFn
@ -190,16 +189,10 @@ func TestMonitorHCPLink(t *testing.T) {
testLoadMgmtToken = test.loadMgmtTokenFn
}
// Start MonitorHCPLink
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
MonitorHCPLink(
ctx, hclog.New(&hclog.LoggerOptions{Output: io.Discard}), mgr, linkWatchCh, testHcpClientFn,
testLoadMgmtToken, existingCfg, dataDir,
)
}()
updateManagerLifecycle := HCPManagerLifecycleFn(
mgr, testHcpClientFn,
testLoadMgmtToken, existingCfg, dataDir,
)
upsertEvent := &pbresource.WatchEvent_Upsert{
Resource: &pbresource.Resource{
@ -219,22 +212,19 @@ func TestMonitorHCPLink(t *testing.T) {
test.mutateUpsertEvent(upsertEvent)
}
linkWatchCh <- &pbresource.WatchEvent{
// Handle upsert event
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
Event: &pbresource.WatchEvent_Upsert_{
Upsert: upsertEvent,
},
}
})
// Delete link, expect HCP manager to be stopped
linkWatchCh <- &pbresource.WatchEvent{
// Handle delete event. This should stop HCP manager
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
Event: &pbresource.WatchEvent_Delete_{
Delete: &pbresource.WatchEvent_Delete{},
},
}
// Wait for MonitorHCPLink to return before assertions run
close(linkWatchCh)
wg.Wait()
})
// Ensure hcp-config directory is removed
file := filepath.Join(dataDir, constants.SubDir)

View File

@ -5,7 +5,6 @@ package hcp
import (
"fmt"
hcpctl "github.com/hashicorp/consul/internal/hcp"
"io"
"testing"
"time"
@ -20,17 +19,17 @@ import (
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
hcpctl "github.com/hashicorp/consul/internal/hcp"
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestManager_MonitorHCPLink(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard})
linkWatchCh := make(chan *pbresource.WatchEvent)
mgr := NewManager(
ManagerConfig{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
@ -45,9 +44,9 @@ func TestManager_MonitorHCPLink(t *testing.T) {
}
require.False(t, mgr.isRunning())
go MonitorHCPLink(
ctx, hclog.New(&hclog.LoggerOptions{Output: io.Discard}), mgr, linkWatchCh, mockHcpClientFn, loadMgmtTokenFn,
config.CloudConfig{}, "",
updateManagerLifecycle := HCPManagerLifecycleFn(
mgr, mockHcpClientFn,
loadMgmtTokenFn, config.CloudConfig{}, "",
)
// Set up a link
@ -59,7 +58,7 @@ func TestManager_MonitorHCPLink(t *testing.T) {
}
linkResource, err := anypb.New(&link)
require.NoError(t, err)
linkWatchCh <- &pbresource.WatchEvent{
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
Event: &pbresource.WatchEvent_Upsert_{
Upsert: &pbresource.WatchEvent_Upsert{
Resource: &pbresource.Resource{
@ -76,14 +75,10 @@ func TestManager_MonitorHCPLink(t *testing.T) {
},
},
},
}
})
// Validate that the HCP manager is started
retry.Run(
t, func(r *retry.R) {
require.True(r, mgr.isRunning())
},
)
require.True(t, mgr.isRunning())
}
func TestManager_Start(t *testing.T) {