diff --git a/internal/hcp/watch.go b/internal/hcp/watch.go index 2fab2ae589..3e3e3363d2 100644 --- a/internal/hcp/watch.go +++ b/internal/hcp/watch.go @@ -6,7 +6,6 @@ package hcp import ( "context" "fmt" - "time" "github.com/hashicorp/go-hclog" @@ -15,7 +14,7 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -var linkWatchRetryTime = time.Second +var linkWatchRetryCount = 10 func NewLinkWatch(ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient) (chan *pbresource.WatchEvent, error) { watchClient, err := client.WatchList( @@ -30,24 +29,30 @@ func NewLinkWatch(ctx context.Context, logger hclog.Logger, client pbresource.Re eventCh := make(chan *pbresource.WatchEvent) go func() { + errorCounter := 0 for { - watchEvent, err := watchClient.Recv() - if err != nil { - select { - case <-ctx.Done(): - logger.Debug("context canceled, exiting") - close(eventCh) - return - default: + select { + case <-ctx.Done(): + logger.Debug("context canceled, exiting") + close(eventCh) + return + default: + watchEvent, err := watchClient.Recv() + if err != nil { logger.Error("error receiving link watch event", "error", err) - // In case of an error, wait before retrying, so we don't log errors in a fast loop - time.Sleep(linkWatchRetryTime) + errorCounter++ + if errorCounter >= linkWatchRetryCount { + logger.Error("received multiple consecutive errors from link watch client, will stop watching link") + close(eventCh) + return + } + continue } - + errorCounter = 0 + eventCh <- watchEvent } - eventCh <- watchEvent } }() diff --git a/internal/hcp/watch_test.go b/internal/hcp/watch_test.go index 44078d54ea..54b26ca0ff 100644 --- a/internal/hcp/watch_test.go +++ b/internal/hcp/watch_test.go @@ -5,99 +5,88 @@ package hcp import ( "context" + "errors" "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "github.com/hashicorp/go-hclog" - svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" - "github.com/hashicorp/consul/internal/controller" + mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource" "github.com/hashicorp/consul/internal/hcp/internal/types" - rtest "github.com/hashicorp/consul/internal/resource/resourcetest" pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2" "github.com/hashicorp/consul/proto-public/pbresource" - "github.com/hashicorp/consul/sdk/testutil" ) -type controllerSuite struct { - suite.Suite +// This tests that when we get a watch event from the Recv call, we get that same event on the +// output channel, then we +func TestLinkWatch_Ok(t *testing.T) { + testWatchEvent := &pbresource.WatchEvent{} + + mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) + mockWatchListClient.EXPECT().Recv().Return(testWatchEvent, nil) + + client := mockpbresource.NewResourceServiceClient(t) + client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: types.LinkName, + }).Return(mockWatchListClient, nil) + linkWatchCh, err := NewLinkWatch(context.Background(), hclog.Default(), client) + require.NoError(t, err) + + require.Eventually(t, func() bool { + select { + case event := <-linkWatchCh: + return event == testWatchEvent + default: + return false + } + }, 10*time.Millisecond, time.Millisecond) +} - ctx context.Context - client *rtest.Client - rt controller.Runtime +// This tests ensures that when the context is canceled, the linkWatchCh is closed +func TestLinkWatch_ContextCanceled(t *testing.T) { + mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) + // Recv may not be called if the context is cancelled before the `select` block + // within the NewLinkWatch goroutine + mockWatchListClient.EXPECT().Recv().Return(nil, errors.New("context canceled")).Maybe() - tenancies []*pbresource.Tenancy - dataDir string -} + client := mockpbresource.NewResourceServiceClient(t) + client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: types.LinkName, + }).Return(mockWatchListClient, nil) -func (suite *controllerSuite) SetupTest() { - suite.ctx = testutil.TestContext(suite.T()) - suite.tenancies = rtest.TestTenancies() - client := svctest.NewResourceServiceBuilder(). - WithRegisterFns(types.Register). - WithTenancies(suite.tenancies...). - Run(suite.T()) - - suite.rt = controller.Runtime{ - Client: client, - Logger: testutil.Logger(suite.T()), - } - suite.client = rtest.NewClient(client) -} + ctx, cancel := context.WithCancel(context.Background()) + linkWatchCh, err := NewLinkWatch(ctx, hclog.Default(), client) + require.NoError(t, err) -func TestLinkWatch(t *testing.T) { - suite.Run(t, new(controllerSuite)) -} + cancel() -func (suite *controllerSuite) deleteResourceFunc(id *pbresource.ID) func() { - return func() { - suite.client.MustDelete(suite.T(), id) - } + // Ensure the linkWatchCh is closed + _, ok := <-linkWatchCh + require.False(t, ok) } -func (suite *controllerSuite) TestLinkWatch_Ok() { - // Run the controller manager - linkData := &pbhcp.Link{ - ClientId: "abc", - ClientSecret: "abc", - ResourceId: types.GenerateTestResourceID(suite.T()), - } - - linkWatchCh, err := NewLinkWatch(suite.ctx, hclog.Default(), suite.client) - require.NoError(suite.T(), err) - - link := rtest.Resource(pbhcp.LinkType, "global"). - WithData(suite.T(), linkData). - Write(suite.T(), suite.client) - - // The first event will always be the "end of snapshot" event - endOfSnapshotEvent := <-linkWatchCh - require.NotNil(suite.T(), endOfSnapshotEvent.GetEndOfSnapshot()) - - select { - case watchEvent := <-linkWatchCh: - require.NotNil(suite.T(), watchEvent.GetUpsert()) - res := watchEvent.GetUpsert().GetResource() - var upsertedLink pbhcp.Link - err := res.GetData().UnmarshalTo(&upsertedLink) - require.NoError(suite.T(), err) - require.Equal(suite.T(), linkData.ClientId, upsertedLink.ClientId) - require.Equal(suite.T(), linkData.ClientSecret, upsertedLink.ClientSecret) - require.Equal(suite.T(), linkData.ResourceId, upsertedLink.ResourceId) - case <-time.After(time.Second): - require.Fail(suite.T(), "nothing emitted on link watch channel for upsert") - } - - _, err = suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: link.Id}) - require.NoError(suite.T(), err) - - select { - case watchEvent := <-linkWatchCh: - require.NotNil(suite.T(), watchEvent.GetDelete()) - case <-time.After(time.Second): - require.Fail(suite.T(), "nothing emitted on link watch channel for delete") - } +// This tests ensures that when Recv returns errors repeatedly, we eventually close the channel +// and exit the goroutine +func TestLinkWatch_RepeatErrors(t *testing.T) { + mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t) + // Recv should be called 10 times and no more since it is repeatedly returning an error. + mockWatchListClient.EXPECT().Recv().Return(nil, errors.New("unexpected error")).Times(linkWatchRetryCount) + + client := mockpbresource.NewResourceServiceClient(t) + client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{ + Type: pbhcp.LinkType, + NamePrefix: types.LinkName, + }).Return(mockWatchListClient, nil) + + linkWatchCh, err := NewLinkWatch(context.Background(), hclog.Default(), client) + require.NoError(t, err) + + // Ensure the linkWatchCh is closed + _, ok := <-linkWatchCh + require.False(t, ok) }