diff --git a/internal/hcp/internal/controllers/telemetrystate/controller.go b/internal/hcp/internal/controllers/telemetrystate/controller.go index aba7b574ed..e21c85685e 100644 --- a/internal/hcp/internal/controllers/telemetrystate/controller.go +++ b/internal/hcp/internal/controllers/telemetrystate/controller.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/internal/controller" @@ -109,20 +110,7 @@ func (r *telemetryStateReconciler) Reconcile(ctx context.Context, rt controller. state.Metrics.IncludeList = []string{tCfg.MetricsConfig.Filters.String()} } - stateData, err := anypb.New(state) - if err != nil { - rt.Logger.Error("error marshalling telemetry-state data", "error", err) - return err - } - - _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{ - Id: &pbresource.ID{ - Name: "global", - Type: pbhcp.TelemetryStateType, - }, - Data: stateData, - }}) - if err != nil { + if err := writeTelemetryStateIfUpdated(ctx, rt, state); err != nil { rt.Logger.Error("error updating telemetry-state", "error", err) return err } @@ -153,10 +141,33 @@ func ensureTelemetryStateDeleted(ctx context.Context, rt controller.Runtime) err return nil } -// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil -// pointer and no error will be returned. -func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) { - resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: pbhcp.LinkType}}) +func writeTelemetryStateIfUpdated(ctx context.Context, rt controller.Runtime, state *pbhcp.TelemetryState) error { + currentState, err := getTelemetryStateResource(ctx, rt) + if err != nil { + return err + } + + if currentState != nil && proto.Equal(currentState.GetData(), state) { + return nil + } + + stateData, err := anypb.New(state) + if err != nil { + return err + } + + _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "global", + Type: pbhcp.TelemetryStateType, + }, + Data: stateData, + }}) + return err +} + +func getGlobalResource(ctx context.Context, rt controller.Runtime, t *pbresource.Type) (*pbresource.Resource, error) { + resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: t}}) switch { case status.Code(err) == codes.NotFound: return nil, nil @@ -164,5 +175,29 @@ func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.Decoded return nil, err } - return resource.Decode[*pbhcp.Link](resp.GetResource()) + return resp.GetResource(), nil +} + +// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil +// pointer and no error will be returned. +func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) { + res, err := getGlobalResource(ctx, rt, pbhcp.LinkType) + if err != nil { + return nil, err + } + if res == nil { + return nil, nil + } + return resource.Decode[*pbhcp.Link](res) +} + +func getTelemetryStateResource(ctx context.Context, rt controller.Runtime) (*types.DecodedTelemetryState, error) { + res, err := getGlobalResource(ctx, rt, pbhcp.TelemetryStateType) + if err != nil { + return nil, err + } + if res == nil { + return nil, nil + } + return resource.Decode[*pbhcp.TelemetryState](res) } diff --git a/internal/hcp/internal/controllers/telemetrystate/controller_test.go b/internal/hcp/internal/controllers/telemetrystate/controller_test.go index 84df8a27c4..e11c1e3063 100644 --- a/internal/hcp/internal/controllers/telemetrystate/controller_test.go +++ b/internal/hcp/internal/controllers/telemetrystate/controller_test.go @@ -33,8 +33,10 @@ type controllerSuite struct { client *rtest.Client rt controller.Runtime - ctl telemetryStateReconciler + ctl *controller.TestController tenancies []*pbresource.Tenancy + + hcpMock *hcpclient.MockClient } func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, link.HCPClientFn) { @@ -55,10 +57,12 @@ func (suite *controllerSuite) SetupTest() { WithTenancies(suite.tenancies...). Run(suite.T()) - suite.rt = controller.Runtime{ - Client: client, - Logger: testutil.Logger(suite.T()), - } + hcpMock, hcpClientFn := mockHcpClientFn(suite.T()) + suite.hcpMock = hcpMock + suite.ctl = controller.NewTestController(TelemetryStateController(hcpClientFn), client). + WithLogger(testutil.Logger(suite.T())) + + suite.rt = suite.ctl.Runtime() suite.client = rtest.NewClient(client) } @@ -93,23 +97,12 @@ func (suite *controllerSuite) TestController_Ok() { mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) - linkData := &pbhcp.Link{ - ClientId: "abc", - ClientSecret: "abc", - ResourceId: types.GenerateTestResourceID(suite.T()), - } - - link := rtest.Resource(pbhcp.LinkType, "global"). - WithData(suite.T(), linkData). - WithStatus(link.StatusKey, &pbresource.Status{Conditions: []*pbresource.Condition{link.ConditionLinked(linkData.ResourceId)}}). - Write(suite.T(), suite.client) - - suite.T().Cleanup(suite.deleteResourceFunc(link.Id)) + link := suite.writeLinkResource() tsRes := suite.client.WaitForResourceExists(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType}) decodedState, err := resource.Decode[*pbhcp.TelemetryState](tsRes) require.NoError(suite.T(), err) - require.Equal(suite.T(), linkData.ResourceId, decodedState.GetData().ResourceId) + require.Equal(suite.T(), link.GetData().GetResourceId(), decodedState.GetData().ResourceId) require.Equal(suite.T(), "xxx", decodedState.GetData().ClientId) require.Equal(suite.T(), "http://localhost/test", decodedState.GetData().Metrics.Endpoint) @@ -117,6 +110,27 @@ func (suite *controllerSuite) TestController_Ok() { suite.client.WaitForDeletion(suite.T(), tsRes.Id) } +func (suite *controllerSuite) TestReconcile_AvoidReconciliationWriteLoop() { + suite.hcpMock.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&hcpclient.TelemetryConfig{ + MetricsConfig: &hcpclient.MetricsConfig{ + Endpoint: &url.URL{ + Scheme: "http", + Host: "localhost", + Path: "/test", + }, + Labels: map[string]string{"foo": "bar"}, + Filters: regexp.MustCompile(".*"), + }, + RefreshConfig: &hcpclient.RefreshConfig{}, + }, nil) + link := suite.writeLinkResource() + suite.hcpMock.EXPECT().GetObservabilitySecret(mock.Anything).Return("xxx", "yyy", nil) + suite.NoError(suite.ctl.Reconcile(context.Background(), controller.Request{ID: link.Id})) + tsRes := suite.client.WaitForResourceExists(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType}) + suite.NoError(suite.ctl.Reconcile(context.Background(), controller.Request{ID: tsRes.Id})) + suite.client.RequireVersionUnchanged(suite.T(), tsRes.Id, tsRes.Version) +} + func (suite *controllerSuite) TestController_LinkingDisabled() { // Run the controller manager mgr := controller.NewManager(suite.client, suite.rt.Logger) @@ -138,3 +152,23 @@ func (suite *controllerSuite) TestController_LinkingDisabled() { suite.client.WaitForDeletion(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType}) } + +func (suite *controllerSuite) writeLinkResource() *types.DecodedLink { + suite.T().Helper() + + linkData := &pbhcp.Link{ + ClientId: "abc", + ClientSecret: "abc", + ResourceId: types.GenerateTestResourceID(suite.T()), + } + + res := rtest.Resource(pbhcp.LinkType, "global"). + WithData(suite.T(), linkData). + WithStatus(link.StatusKey, &pbresource.Status{Conditions: []*pbresource.Condition{link.ConditionLinked(linkData.ResourceId)}}). + Write(suite.T(), suite.client) + + suite.T().Cleanup(suite.deleteResourceFunc(res.Id)) + link, err := resource.Decode[*pbhcp.Link](res) + require.NoError(suite.T(), err) + return link +}