diff --git a/internal/catalog/internal/controllers/nodehealth/controller.go b/internal/catalog/internal/controllers/nodehealth/controller.go index 8502872169..b1b4e54000 100644 --- a/internal/catalog/internal/controllers/nodehealth/controller.go +++ b/internal/catalog/internal/controllers/nodehealth/controller.go @@ -7,19 +7,22 @@ import ( "context" "fmt" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/controller/cache" + "github.com/hashicorp/consul/internal/controller/cache/indexers" "github.com/hashicorp/consul/internal/controller/dependency" "github.com/hashicorp/consul/internal/resource" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" ) +const ( + nodeOwnerIndexName = "owner" +) + func NodeHealthController() *controller.Controller { return controller.NewController(StatusKey, pbcatalog.NodeType). - WithWatch(pbcatalog.NodeHealthStatusType, dependency.MapOwnerFiltered(pbcatalog.NodeType)). + WithWatch(pbcatalog.NodeHealthStatusType, dependency.MapOwnerFiltered(pbcatalog.NodeType), indexers.OwnerIndex(nodeOwnerIndexName)). WithReconciler(&nodeHealthReconciler{}) } @@ -33,38 +36,36 @@ func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runt rt.Logger.Trace("reconciling node health") // read the node - rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID}) - switch { - case status.Code(err) == codes.NotFound: - rt.Logger.Trace("node has been deleted") - return nil - case err != nil: - rt.Logger.Error("the resource service has returned an unexpected error", "error", err) + node, err := rt.Cache.Get(pbcatalog.NodeType, "id", req.ID) + if err != nil { + rt.Logger.Error("the cache has returned an unexpected error", "error", err) return err } + if node == nil { + rt.Logger.Trace("node has been deleted") + return nil + } - res := rsp.Resource - - health, err := getNodeHealth(ctx, rt, req.ID) + health, err := getNodeHealth(rt, req.ID) if err != nil { rt.Logger.Error("failed to calculate the nodes health", "error", err) return err } newStatus := &pbresource.Status{ - ObservedGeneration: res.Generation, + ObservedGeneration: node.Generation, Conditions: []*pbresource.Condition{ Conditions[health], }, } - if resource.EqualStatus(res.Status[StatusKey], newStatus, false) { + if resource.EqualStatus(node.Status[StatusKey], newStatus, false) { rt.Logger.Trace("resources node health status is unchanged", "health", health.String()) return nil } _, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ - Id: res.Id, + Id: node.Id, Key: StatusKey, Status: newStatus, }) @@ -78,30 +79,24 @@ func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runt return nil } -func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) { - rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{ - Owner: nodeRef, - }) - +func getNodeHealth(rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) { + iter, err := cache.ListIteratorDecoded[*pbcatalog.NodeHealthStatus](rt.Cache, pbcatalog.NodeHealthStatusType, nodeOwnerIndexName, nodeRef) if err != nil { return pbcatalog.Health_HEALTH_CRITICAL, err } health := pbcatalog.Health_HEALTH_PASSING - for _, res := range rsp.Resources { - if resource.EqualType(res.Id.Type, pbcatalog.NodeHealthStatusType) { - var hs pbcatalog.NodeHealthStatus - if err := res.Data.UnmarshalTo(&hs); err != nil { - // This should be impossible as the resource service + type validations the - // catalog is performing will ensure that no data gets written where unmarshalling - // to this type will error. - return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error unmarshalling health status data: %w", err) - } + for hs, err := iter.Next(); hs != nil || err != nil; hs, err = iter.Next() { + if err != nil { + // This should be impossible as the resource service + type validations the + // catalog is performing will ensure that no data gets written where unmarshalling + // to this type will error. + return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error getting decoded health status data: %w", err) + } - if hs.Status > health { - health = hs.Status - } + if hs.Data.Status > health { + health = hs.Data.Status } } diff --git a/internal/catalog/internal/controllers/nodehealth/controller_test.go b/internal/catalog/internal/controllers/nodehealth/controller_test.go index 4219a2f1d2..895e55fff3 100644 --- a/internal/catalog/internal/controllers/nodehealth/controller_test.go +++ b/internal/catalog/internal/controllers/nodehealth/controller_test.go @@ -11,8 +11,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" "github.com/hashicorp/consul/internal/catalog/internal/types" @@ -70,7 +68,7 @@ type nodeHealthControllerTestSuite struct { resourceClient *resourcetest.Client runtime controller.Runtime - ctl nodeHealthReconciler + ctl *controller.TestController nodeNoHealth *pbresource.ID nodePassing *pbresource.ID @@ -97,30 +95,13 @@ func (suite *nodeHealthControllerTestSuite) SetupTest() { WithTenancies(suite.tenancies...). Run(suite.T()) - suite.resourceClient = resourcetest.NewClient(client) - suite.runtime = controller.Runtime{Client: suite.resourceClient, Logger: testutil.Logger(suite.T())} + suite.ctl = controller.NewTestController(NodeHealthController(), client). + WithLogger(testutil.Logger(suite.T())) + suite.runtime = suite.ctl.Runtime() + suite.resourceClient = resourcetest.NewClient(suite.runtime.Client) suite.isEnterprise = versiontest.IsEnterprise() } -func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthListError() { - suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - // This resource id references a resource type that will not be - // registered with the resource service. The ListByOwner call - // should produce an InvalidArgument error. This test is meant - // to validate how that error is handled (its propagated back - // to the caller) - ref := resourceID( - &pbresource.Type{Group: "not", GroupVersion: "v1", Kind: "found"}, - "irrelevant", - tenancy, - ) - health, err := getNodeHealth(context.Background(), suite.runtime, ref) - require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - }) -} - func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { // This test is meant to ensure that when the node doesn't exist @@ -131,7 +112,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { Partition: tenancy.Partition, }) ref.Uid = ulid.Make().String() - health, err := getNodeHealth(context.Background(), suite.runtime, ref) + health, err := getNodeHealth(suite.runtime, ref) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) @@ -141,7 +122,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoNode() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeNoHealth) + health, err := getNodeHealth(suite.runtime, suite.nodeNoHealth) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) }) @@ -150,7 +131,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthNoStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthPassingStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodePassing) + health, err := getNodeHealth(suite.runtime, suite.nodePassing) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) }) @@ -159,7 +140,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthPassingStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthCriticalStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeCritical) + health, err := getNodeHealth(suite.runtime, suite.nodeCritical) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) }) @@ -168,7 +149,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthCriticalStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthWarningStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeWarning) + health, err := getNodeHealth(suite.runtime, suite.nodeWarning) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_WARNING, health) }) @@ -177,7 +158,7 @@ func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthWarningStatus() { func (suite *nodeHealthControllerTestSuite) TestGetNodeHealthMaintenanceStatus() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - health, err := getNodeHealth(context.Background(), suite.runtime, suite.nodeMaintenance) + health, err := getNodeHealth(suite.runtime, suite.nodeMaintenance) require.NoError(suite.T(), err) require.Equal(suite.T(), pbcatalog.Health_HEALTH_MAINTENANCE, health) }) @@ -187,7 +168,7 @@ func (suite *nodeHealthControllerTestSuite) TestReconcileNodeNotFound() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { // This test ensures that removed nodes are ignored. In particular we don't // want to propagate the error and indefinitely keep re-reconciling in this case. - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + err := suite.ctl.Reconcile(context.Background(), controller.Request{ ID: resourceID(pbcatalog.NodeType, "not-found", &pbresource.Tenancy{ Partition: tenancy.Partition, }), @@ -196,31 +177,10 @@ func (suite *nodeHealthControllerTestSuite) TestReconcileNodeNotFound() { }) } -func (suite *nodeHealthControllerTestSuite) TestReconcilePropagateReadError() { - suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { - // This test aims to ensure that errors other than NotFound errors coming - // from the initial resource read get propagated. This case is very unrealistic - // as the controller should not have given us a request ID for a resource type - // that doesn't exist but this was the easiest way I could think of to synthesize - // a Read error. - ref := resourceID( - &pbresource.Type{Group: "not", GroupVersion: "v1", Kind: "found"}, - "irrelevant", - tenancy, - ) - - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ - ID: ref, - }) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - }) -} - func (suite *nodeHealthControllerTestSuite) testReconcileStatus(id *pbresource.ID, expectedStatus *pbresource.Condition) *pbresource.Resource { suite.T().Helper() - err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ + err := suite.ctl.Reconcile(context.Background(), controller.Request{ ID: id, }) require.NoError(suite.T(), err)