internal/hcp: prevent write loop on telemetrystate resource updates

pull/20435/head
Nick Ethier 2024-02-01 11:58:46 -05:00
parent 7c00d396cf
commit 2069bd134a
1 changed files with 54 additions and 19 deletions

View File

@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/internal/controller"
@ -109,20 +110,7 @@ func (r *telemetryStateReconciler) Reconcile(ctx context.Context, rt controller.
state.Metrics.IncludeList = []string{tCfg.MetricsConfig.Filters.String()}
}
stateData, err := anypb.New(state)
if err != nil {
rt.Logger.Error("error marshalling telemetry-state data", "error", err)
return err
}
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
Id: &pbresource.ID{
Name: "global",
Type: pbhcp.TelemetryStateType,
},
Data: stateData,
}})
if err != nil {
if err := writeTelemetryStateIfUpdated(ctx, rt, state); err != nil {
rt.Logger.Error("error updating telemetry-state", "error", err)
return err
}
@ -153,10 +141,33 @@ func ensureTelemetryStateDeleted(ctx context.Context, rt controller.Runtime) err
return nil
}
// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil
// pointer and no error will be returned.
func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) {
resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: pbhcp.LinkType}})
func writeTelemetryStateIfUpdated(ctx context.Context, rt controller.Runtime, state *pbhcp.TelemetryState) error {
currentState, err := getTelemetryStateResource(ctx, rt)
if err != nil {
return err
}
if currentState != nil && proto.Equal(currentState, state) {
return nil
}
stateData, err := anypb.New(state)
if err != nil {
return err
}
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
Id: &pbresource.ID{
Name: "global",
Type: pbhcp.TelemetryStateType,
},
Data: stateData,
}})
return err
}
func getGlobalResource(ctx context.Context, rt controller.Runtime, t *pbresource.Type) (*pbresource.Resource, error) {
resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: t}})
switch {
case status.Code(err) == codes.NotFound:
return nil, nil
@ -164,5 +175,29 @@ func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.Decoded
return nil, err
}
return resource.Decode[*pbhcp.Link](resp.GetResource())
return resp.GetResource(), nil
}
// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil
// pointer and no error will be returned.
func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) {
res, err := getGlobalResource(ctx, rt, pbhcp.LinkType)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return resource.Decode[*pbhcp.Link](res)
}
func getTelemetryStateResource(ctx context.Context, rt controller.Runtime) (*types.DecodedTelemetryState, error) {
res, err := getGlobalResource(ctx, rt, pbhcp.TelemetryStateType)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return resource.Decode[*pbhcp.TelemetryState](res)
}