diff --git a/internal/multicluster/internal/controllers/exportedservices/builder.go b/internal/multicluster/internal/controllers/exportedservices/builder.go index 06671046f2..65dc190a72 100644 --- a/internal/multicluster/internal/controllers/exportedservices/builder.go +++ b/internal/multicluster/internal/controllers/exportedservices/builder.go @@ -6,6 +6,7 @@ package exportedservices import ( "sort" + expanderTypes "github.com/hashicorp/consul/internal/multicluster/internal/controllers/exportedservices/expander/types" "github.com/hashicorp/consul/internal/multicluster/internal/types" "github.com/hashicorp/consul/internal/resource" pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1" @@ -22,6 +23,7 @@ type exportedServicesBuilder struct { data map[resource.ReferenceKey]*serviceExports samenessGroupsExpander ExportedServicesSamenessGroupExpander samenessGroupsNameToMemberMap map[string][]*pbmulticluster.SamenessGroupMember + missingSamenessGroups map[resource.ReferenceKey][]string } func newExportedServicesBuilder(samenessGroupsExpander ExportedServicesSamenessGroupExpander, samenessGroups []*types.DecodedSamenessGroup) *exportedServicesBuilder { @@ -40,10 +42,28 @@ func newExportedServicesBuilder(samenessGroupsExpander ExportedServicesSamenessG data: make(map[resource.ReferenceKey]*serviceExports), samenessGroupsExpander: samenessGroupsExpander, samenessGroupsNameToMemberMap: samenessGroupsNameToMemberMap, + missingSamenessGroups: make(map[resource.ReferenceKey][]string), } } -func (b *exportedServicesBuilder) track(svcID *pbresource.ID, consumers []*pbmulticluster.ExportedServicesConsumer) error { +// expandConsumers expands the consumers for a given ExportedServices resource +// and keeps track of the unresolved sameness groups +func (b *exportedServicesBuilder) expandConsumers(exportedSvcResourceRef resource.ReferenceKey, consumers []*pbmulticluster.ExportedServicesConsumer) (*expanderTypes.ExpandedConsumers, error) { + expandedConsumers, err := b.samenessGroupsExpander.Expand(consumers, b.samenessGroupsNameToMemberMap) + if err != nil { + return nil, err + } + + if len(expandedConsumers.MissingSamenessGroups) > 0 { + b.missingSamenessGroups[exportedSvcResourceRef] = append(b.missingSamenessGroups[exportedSvcResourceRef], expandedConsumers.MissingSamenessGroups...) + } + + return expandedConsumers, nil +} + +// track associates a service resource with the corresponding partitions +// and peers declared by the various ExportedService resources. +func (b *exportedServicesBuilder) track(svcID *pbresource.ID, expandedConsumers *expanderTypes.ExpandedConsumers) { key := resource.NewReferenceKey(svcID) exports, ok := b.data[key] @@ -56,11 +76,6 @@ func (b *exportedServicesBuilder) track(svcID *pbresource.ID, consumers []*pbmul b.data[key] = exports } - expandedConsumers, err := b.samenessGroupsExpander.Expand(consumers, b.samenessGroupsNameToMemberMap) - if err != nil { - return err - } - for _, p := range expandedConsumers.Partitions { exports.partitions[p] = struct{}{} } @@ -68,10 +83,6 @@ func (b *exportedServicesBuilder) track(svcID *pbresource.ID, consumers []*pbmul for _, p := range expandedConsumers.Peers { exports.peers[p] = struct{}{} } - - // TODO: Handle status for missing sameness groups - - return nil } func (b *exportedServicesBuilder) build() *pbmulticluster.ComputedExportedServices { @@ -119,6 +130,20 @@ func (b *exportedServicesBuilder) build() *pbmulticluster.ComputedExportedServic return ces } +// getMissingSamenessGroupsForComputedExportedService returns back the sorted +// list of unique SamenessGroup names that couldn't be resolved by the builder +// for the ComputedExportedService resource. +func (b *exportedServicesBuilder) getMissingSamenessGroupsForComputedExportedService() []string { + samenessGroupMap := make(map[string]struct{}) + for _, val := range b.missingSamenessGroups { + for _, v := range val { + samenessGroupMap[v] = struct{}{} + } + } + + return sortKeys(samenessGroupMap) +} + func sortKeys(m map[string]struct{}) []string { keys := make([]string, 0, len(m)) for key := range m { diff --git a/internal/multicluster/internal/controllers/exportedservices/controller.go b/internal/multicluster/internal/controllers/exportedservices/controller.go index ecbfed1f63..6a83520ed5 100644 --- a/internal/multicluster/internal/controllers/exportedservices/controller.go +++ b/internal/multicluster/internal/controllers/exportedservices/controller.go @@ -5,6 +5,8 @@ package exportedservices import ( "context" + "fmt" + "strings" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -56,33 +58,30 @@ type reconciler struct { func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { rt.Logger = rt.Logger.With("resource-id", req.ID) rt.Logger.Trace("reconciling exported services") + + tenancy := &pbresource.Tenancy{ + Namespace: storage.Wildcard, + Partition: req.ID.Tenancy.Partition, + } exportedServices, err := resource.ListDecodedResource[*pbmulticluster.ExportedServices](ctx, rt.Client, &pbresource.ListRequest{ - Tenancy: &pbresource.Tenancy{ - Namespace: storage.Wildcard, - Partition: req.ID.Tenancy.Partition, - }, - Type: pbmulticluster.ExportedServicesType, + Tenancy: tenancy, + Type: pbmulticluster.ExportedServicesType, }) if err != nil { rt.Logger.Error("error getting exported services", "error", err) return err } namespaceExportedServices, err := resource.ListDecodedResource[*pbmulticluster.NamespaceExportedServices](ctx, rt.Client, &pbresource.ListRequest{ - Tenancy: &pbresource.Tenancy{ - Namespace: storage.Wildcard, - Partition: req.ID.Tenancy.Partition, - }, - Type: pbmulticluster.NamespaceExportedServicesType, + Tenancy: tenancy, + Type: pbmulticluster.NamespaceExportedServicesType, }) if err != nil { rt.Logger.Error("error getting namespace exported services", "error", err) return err } partitionedExportedServices, err := resource.ListDecodedResource[*pbmulticluster.PartitionExportedServices](ctx, rt.Client, &pbresource.ListRequest{ - Tenancy: &pbresource.Tenancy{ - Partition: req.ID.Tenancy.Partition, - }, - Type: pbmulticluster.PartitionExportedServicesType, + Tenancy: tenancy, + Type: pbmulticluster.PartitionExportedServicesType, }) if err != nil { rt.Logger.Error("error getting partitioned exported services", "error", err) @@ -114,7 +113,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c rt.Logger.Error("error getting services", "error", err) return err } - + servicesIds := make([]*pbresource.ID, 0, len(services)) samenessGroups, err := r.samenessGroupExpander.List(ctx, rt, req) if err != nil { rt.Logger.Error("failed to fetch sameness groups", err) @@ -126,88 +125,167 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c svcs := make(map[resource.ReferenceKey]struct{}, len(services)) for _, svc := range services { svcs[resource.NewReferenceKey(svc.Id)] = struct{}{} + servicesIds = append(servicesIds, svc.Id) } + exportedServicesRefMap := make(map[resource.ReferenceKey]*pbresource.Resource) for _, es := range exportedServices { + + var serviceIdsExpServices []*pbresource.ID for _, svc := range es.Data.Services { - id := &pbresource.ID{ + svcId := &pbresource.ID{ Type: pbcatalog.ServiceType, Tenancy: es.Id.Tenancy, Name: svc, } - if _, ok := svcs[resource.NewReferenceKey(id)]; ok { - if err := builder.track(id, es.Data.Consumers); err != nil { - rt.Logger.Error("error tracking service for exported service", - "exported_service", es.Id.Name, - "service", id.Name, - "error", err, - ) - return err - } - } + serviceIdsExpServices = append(serviceIdsExpServices, svcId) + } + + err = processExportedService[*pbmulticluster.ExportedServices](es, exportedServicesRefMap, builder, rt, svcs, + serviceIdsExpServices, es.Data.Consumers) + if err != nil { + rt.Logger.Error("error processing exported services", es.Id.Name, "error", err) + return err } } for _, nes := range namespaceExportedServices { + + var serviceIdsNamespaceExpServices []*pbresource.ID for _, svc := range services { if svc.Id.Tenancy.Namespace != nes.Id.Tenancy.Namespace { continue } - if err := builder.track(svc.Id, nes.Data.Consumers); err != nil { - rt.Logger.Error("error tracking service for namespace exported service", - "exported_service", nes.Id.Name, - "service", svc.Id.Name, - "error", err, - ) - return err - } + serviceIdsNamespaceExpServices = append(serviceIdsNamespaceExpServices, svc.Id) + } + + err = processExportedService[*pbmulticluster.NamespaceExportedServices](nes, exportedServicesRefMap, builder, rt, svcs, + serviceIdsNamespaceExpServices, nes.Data.Consumers) + if err != nil { + rt.Logger.Error("error processing namespace exported services", nes.Id.Name, "error", err) + return err } } for _, pes := range partitionedExportedServices { - for _, svc := range services { - if err := builder.track(svc.Id, pes.Data.Consumers); err != nil { - rt.Logger.Error("error tracking service for partition exported service", - "exported_service", pes.Id.Name, - "service", svc.Id.Name, - "error", err, - ) - return err - } + err = processExportedService[*pbmulticluster.PartitionExportedServices](pes, exportedServicesRefMap, builder, rt, svcs, + servicesIds, pes.Data.Consumers) + if err != nil { + rt.Logger.Error("error processing partition exported services", pes.Id.Name, "error", err) + return err } } + newComputedExportedService := builder.build() if oldComputedExportedService.GetResource() != nil && newComputedExportedService == nil { rt.Logger.Trace("deleting computed exported services") if err := deleteResource(ctx, rt, oldComputedExportedService.GetResource()); err != nil { rt.Logger.Error("error deleting computed exported service", "error", err) + writeStatus(ctx, rt, oldComputedExportedService.Resource, []*pbresource.Condition{conditionNotComputed(err.Error())}) return err } return nil } - if proto.Equal(newComputedExportedService, oldComputedExportedService.GetData()) { - rt.Logger.Trace("skip writing computed exported services") + + shouldUpdateResource := !proto.Equal(newComputedExportedService, oldComputedExportedService.GetData()) + computedExportedServiceResource := oldComputedExportedService.GetResource() + if shouldUpdateResource { + newComputedExportedServiceData, err := anypb.New(newComputedExportedService) + if err != nil { + rt.Logger.Error("error marshalling latest computed exported service", "error", err) + return err + } + + rt.Logger.Trace("writing computed exported services") + resp, err := rt.Client.Write(ctx, &pbresource.WriteRequest{ + Resource: &pbresource.Resource{ + Id: req.ID, + Owner: nil, + Data: newComputedExportedServiceData, + }, + }) + if err != nil { + rt.Logger.Error("error writing computed exported service", "error", err) + writeStatus(ctx, rt, oldComputedExportedService.Resource, []*pbresource.Condition{conditionNotComputed(err.Error())}) + return err + } + + computedExportedServiceResource = resp.Resource + } + + if computedExportedServiceResource == nil { + rt.Logger.Debug("skipping status update for nil resource") return nil } - newComputedExportedServiceData, err := anypb.New(newComputedExportedService) + + missingSamenessGroups := builder.getMissingSamenessGroupsForComputedExportedService() + if len(missingSamenessGroups) == 0 { + return writeStatus(ctx, rt, computedExportedServiceResource, []*pbresource.Condition{ + conditionComputed(), + }) + } + + err = writeStatus(ctx, + rt, + computedExportedServiceResource, + []*pbresource.Condition{ + conditionComputed(), + conditionMissingSamenessGroups(getSamenessGroupsUnresolvedErrorMsg(missingSamenessGroups)), + }, + ) if err != nil { - rt.Logger.Error("error marshalling latest computed exported service", "error", err) return err } - rt.Logger.Trace("writing computed exported services") - _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{ - Resource: &pbresource.Resource{ - Id: req.ID, - Owner: nil, - Data: newComputedExportedServiceData, - }, - }) + // Write the failed status to ExportedServices, NamespaceExportedServices + // and PartitionedExportedServices which have missing sameness group + // references. + for ref, sgList := range builder.missingSamenessGroups { + expSvcRes, ok := exportedServicesRefMap[ref] + if !ok { + panic("unexpected resource ref") + } + + sgMap := make(map[string]struct{}) + for _, sg := range sgList { + sgMap[sg] = struct{}{} + } + + err = writeStatus(ctx, + rt, + expSvcRes, + []*pbresource.Condition{ + conditionMissingSamenessGroups(getSamenessGroupsUnresolvedErrorMsg(sortKeys(sgMap))), + }, + ) + if err != nil { + return err + } + } + return nil +} + +func processExportedService[T proto.Message](es *resource.DecodedResource[T], + exportedServicesRefMap map[resource.ReferenceKey]*pbresource.Resource, + builder *exportedServicesBuilder, rt controller.Runtime, svcs map[resource.ReferenceKey]struct{}, + services []*pbresource.ID, consumers []*pbmulticluster.ExportedServicesConsumer) error { + + ref := resource.NewReferenceKey(es.Id) + exportedServicesRefMap[ref] = es.Resource + expandedConsumers, err := builder.expandConsumers(ref, consumers) if err != nil { - rt.Logger.Error("error writing computed exported service", "error", err) + rt.Logger.Error("error expanding consumers for exported service", + "exported_service", "exported services type", es.Id.Name, + "error", err, + ) return err } + for _, svc := range services { + if _, ok := svcs[resource.NewReferenceKey(svc)]; ok { + builder.track(svc, expandedConsumers) + } + } return nil } @@ -227,6 +305,33 @@ func ReplaceTypeForComputedExportedServices() controller.DependencyMapper { } } +func writeStatus(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, conditions []*pbresource.Condition) error { + if res == nil { + return nil + } + + newStatus := &pbresource.Status{ + ObservedGeneration: res.Generation, + Conditions: conditions, + } + + if resource.EqualStatus(res.Status[statusKey], newStatus, false) { + rt.Logger.Debug("skipping status update for resource", "resource", res.Id) + return nil + } + + _, err := rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ + Id: res.Id, + Key: statusKey, + Status: newStatus, + }) + return err +} + +func getSamenessGroupsUnresolvedErrorMsg(unresolvedSGs []string) string { + return fmt.Sprintf("Some SamenessGroups cannot be resolved : %s", strings.Join(unresolvedSGs, ",")) +} + func getOldComputedExportedService(ctx context.Context, rt controller.Runtime, req controller.Request) (*resource.DecodedResource[*pbmulticluster.ComputedExportedServices], error) { computedExpSvcID := &pbresource.ID{ Name: types.ComputedExportedServicesName, diff --git a/internal/multicluster/internal/controllers/exportedservices/controller_test.go b/internal/multicluster/internal/controllers/exportedservices/controller_test.go index 53cd76bbbd..5548bace96 100644 --- a/internal/multicluster/internal/controllers/exportedservices/controller_test.go +++ b/internal/multicluster/internal/controllers/exportedservices/controller_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "google.golang.org/grpc" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" "github.com/hashicorp/consul/internal/catalog" @@ -189,9 +190,15 @@ func (suite *controllerSuite) TestReconcile_SkipWritingNewCES() { oldCESData.Services[0].Consumers = append(oldCESData.Services[0].Consumers, suite.constructConsumer("part-n", "partition")) } + oldStatus := &pbresource.Status{ + Conditions: []*pbresource.Condition{ + conditionComputed(), + }, + } oldCES := rtest.Resource(pbmulticluster.ComputedExportedServicesType, "global"). WithData(suite.T(), oldCESData). WithTenancy(&pbresource.Tenancy{Partition: tenancy.Partition}). + WithStatus(statusKey, oldStatus). Write(suite.T(), suite.client) require.NotNil(suite.T(), oldCES) @@ -238,6 +245,85 @@ func (suite *controllerSuite) TestReconcile_SkipWritingNewCES() { }) } +func (suite *controllerSuite) TestReconcile_SkipWritingNewCES_WithStatusUpdate() { + // This test's purpose is to ensure that we skip + // writing the new CES when there are no changes to + // the existing one but write the status if there + // is a mismatch + + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + oldCESData := &pbmulticluster.ComputedExportedServices{ + Services: []*pbmulticluster.ComputedExportedService{ + { + TargetRef: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Tenancy: tenancy, + Name: "svc-0", + }, + Consumers: []*pbmulticluster.ComputedExportedServiceConsumer{ + suite.constructConsumer("peer-1", "peer"), + }, + }, + }, + } + + if suite.isEnterprise { + oldCESData.Services[0].Consumers = append(oldCESData.Services[0].Consumers, suite.constructConsumer("part-n", "partition")) + } + + oldCES := rtest.Resource(pbmulticluster.ComputedExportedServicesType, "global"). + WithData(suite.T(), oldCESData). + WithTenancy(&pbresource.Tenancy{Partition: tenancy.Partition}). + Write(suite.T(), suite.client) + require.NotNil(suite.T(), oldCES) + + // Export the svc-0 service to just a peer + exportedSvcData := &pbmulticluster.ExportedServices{ + Services: []string{"svc-0"}, + Consumers: []*pbmulticluster.ExportedServicesConsumer{ + {ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Peer{Peer: "peer-1"}}, + }, + } + _ = rtest.Resource(pbmulticluster.ExportedServicesType, "exported-svcs"). + WithData(suite.T(), exportedSvcData). + WithTenancy(tenancy). + Write(suite.T(), suite.client) + + if suite.isEnterprise { + // Export all services in a given partition to `part-n` partition + pesData := &pbmulticluster.PartitionExportedServices{ + Consumers: []*pbmulticluster.ExportedServicesConsumer{ + {ConsumerTenancy: &pbmulticluster.ExportedServicesConsumer_Partition{Partition: "part-n"}}, + }, + } + _ = rtest.Resource(pbmulticluster.PartitionExportedServicesType, "pes"). + WithData(suite.T(), pesData). + WithTenancy(&pbresource.Tenancy{Partition: tenancy.Partition}). + Write(suite.T(), suite.client) + } + + svcData := &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + } + _ = rtest.Resource(pbcatalog.ServiceType, "svc-0"). + WithData(suite.T(), svcData). + WithTenancy(tenancy). + Write(suite.T(), suite.client) + + passThroughClient := newPassThroughResourceClient(suite.client) + rt := suite.controllerRuntimeWithPassThroughClient(passThroughClient) + err := suite.reconciler.Reconcile(suite.ctx, rt, controller.Request{ID: oldCES.Id}) + require.NoError(suite.T(), err) + + // Checking version change to ensure that the status gets updated + newCES := suite.client.RequireVersionChanged(suite.T(), oldCES.Id, oldCES.Version) + rtest.RequireStatusCondition(suite.T(), newCES, statusKey, conditionComputed()) + require.Equal(suite.T(), 0, passThroughClient.writesCount) + }) +} + func (suite *controllerSuite) TestReconcile_ComputeCES() { suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { suite.writeService("svc-0", tenancy) @@ -281,7 +367,9 @@ func (suite *controllerSuite) TestReconcile_ComputeCES() { err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id}) require.NoError(suite.T(), err) - computedCES := suite.getComputedExportedSvc(id) + res := suite.client.RequireResourceExists(suite.T(), id) + computedCES := suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) var expectedCES *pbmulticluster.ComputedExportedServices if suite.isEnterprise { @@ -379,7 +467,8 @@ func (suite *controllerSuite) TestController() { require.NotNil(suite.T(), expSvc) res := suite.client.WaitForResourceExists(suite.T(), id) - computedCES := suite.getComputedExportedSvc(id) + computedCES := suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService := constructComputedExportedServices( constructComputedExportedService( @@ -403,7 +492,9 @@ func (suite *controllerSuite) TestController() { namespaceExportedSvc := suite.writeNamespaceExportedService("namesvc", tenancy, exportedNamespaceSvcData) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) + expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc1", tenancy), @@ -426,7 +517,8 @@ func (suite *controllerSuite) TestController() { svc3 := suite.writeService("svc3", tenancy) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc1", tenancy), @@ -455,7 +547,8 @@ func (suite *controllerSuite) TestController() { suite.client.MustDelete(suite.T(), svc3.Id) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc1", tenancy), @@ -485,7 +578,8 @@ func (suite *controllerSuite) TestController() { partExpService := suite.writePartitionedExportedService("partsvc", tenancy, partitionedExportedSvcData) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -516,7 +610,8 @@ func (suite *controllerSuite) TestController() { svc4 := suite.writeService("svc4", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -554,7 +649,8 @@ func (suite *controllerSuite) TestController() { suite.client.MustDelete(suite.T(), svc4.Id) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -586,7 +682,8 @@ func (suite *controllerSuite) TestController() { suite.writeService("svc5", tenancy) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -624,7 +721,8 @@ func (suite *controllerSuite) TestController() { suite.client.MustDelete(suite.T(), partExpService.Id) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc1", tenancy), @@ -651,7 +749,8 @@ func (suite *controllerSuite) TestController() { suite.client.MustDelete(suite.T(), namespaceExportedSvc.Id) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc1", tenancy), @@ -669,7 +768,8 @@ func (suite *controllerSuite) TestController() { namespaceExportedSvc = suite.writeNamespaceExportedService("namesvc1", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}, exportedNamespaceSvcData) res = suite.client.WaitForResourceExists(suite.T(), id) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -682,7 +782,8 @@ func (suite *controllerSuite) TestController() { expSvc = suite.writeExportedService("expsvc1", tenancy, exportedSvcData) res = suite.client.WaitForNewVersion(suite.T(), id, res.Version) - computedCES = suite.getComputedExportedSvc(res.Id) + computedCES = suite.getComputedExportedSvcData(res) + rtest.RequireStatusCondition(suite.T(), res, statusKey, conditionComputed()) expectedComputedExportedService = constructComputedExportedServices( constructComputedExportedService( constructSvcReference("svc0", &pbresource.Tenancy{Partition: tenancy.Partition, Namespace: "app"}), @@ -725,9 +826,8 @@ func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) str return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition) } -func (suite *controllerSuite) getComputedExportedSvc(id *pbresource.ID) *pbmulticluster.ComputedExportedServices { - computedExportedService := suite.client.RequireResourceExists(suite.T(), id) - decodedComputedExportedService := rtest.MustDecode[*pbmulticluster.ComputedExportedServices](suite.T(), computedExportedService) +func (suite *controllerSuite) getComputedExportedSvcData(ces *pbresource.Resource) *pbmulticluster.ComputedExportedServices { + decodedComputedExportedService := rtest.MustDecode[*pbmulticluster.ComputedExportedServices](suite.T(), ces) return decodedComputedExportedService.Data } @@ -783,6 +883,14 @@ func (suite *controllerSuite) constructConsumer(name, consumerType string) *pbmu } } +func (suite *controllerSuite) controllerRuntimeWithPassThroughClient(client *passThroughResourceClient) controller.Runtime { + return controller.Runtime{ + Cache: suite.rt.Cache, + Logger: suite.rt.Logger, + Client: client, + } +} + func constructComputedExportedService(ref *pbresource.Reference, consumers []*pbmulticluster.ComputedExportedServiceConsumer) *pbmulticluster.ComputedExportedService { finalConsumers := make([]*pbmulticluster.ComputedExportedServiceConsumer, 0) for _, c := range consumers { @@ -812,3 +920,57 @@ func constructSvcReference(name string, tenancy *pbresource.Tenancy) *pbresource Name: name, } } + +type passThroughResourceClient struct { + client pbresource.ResourceServiceClient + + writesCount int + writeStatusCount int +} + +// newPassThroughResourceClient returns a client that implements pbresource.ResourceServiceClient +// It can be used to keep track of operations happening within a controller +func newPassThroughResourceClient(client *rtest.Client) *passThroughResourceClient { + return &passThroughResourceClient{ + client: client, + } +} + +func (pc *passThroughResourceClient) resetCounters() { + pc.writeStatusCount = 0 + pc.writesCount = 0 +} + +func (pc *passThroughResourceClient) Read(ctx context.Context, in *pbresource.ReadRequest, opts ...grpc.CallOption) (*pbresource.ReadResponse, error) { + return pc.client.Read(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) Write(ctx context.Context, in *pbresource.WriteRequest, opts ...grpc.CallOption) (*pbresource.WriteResponse, error) { + pc.writesCount++ + return pc.client.Write(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) WriteStatus(ctx context.Context, in *pbresource.WriteStatusRequest, opts ...grpc.CallOption) (*pbresource.WriteStatusResponse, error) { + pc.writeStatusCount++ + return pc.client.WriteStatus(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) Delete(ctx context.Context, in *pbresource.DeleteRequest, opts ...grpc.CallOption) (*pbresource.DeleteResponse, error) { + return pc.client.Delete(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) List(ctx context.Context, in *pbresource.ListRequest, opts ...grpc.CallOption) (*pbresource.ListResponse, error) { + return pc.client.List(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) ListByOwner(ctx context.Context, in *pbresource.ListByOwnerRequest, opts ...grpc.CallOption) (*pbresource.ListByOwnerResponse, error) { + return pc.client.ListByOwner(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) WatchList(ctx context.Context, in *pbresource.WatchListRequest, opts ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) { + return pc.client.WatchList(ctx, in, opts...) +} + +func (pc *passThroughResourceClient) MutateAndValidate(ctx context.Context, in *pbresource.MutateAndValidateRequest, opts ...grpc.CallOption) (*pbresource.MutateAndValidateResponse, error) { + return pc.client.MutateAndValidate(ctx, in, opts...) +} diff --git a/internal/multicluster/internal/controllers/exportedservices/status.go b/internal/multicluster/internal/controllers/exportedservices/status.go new file mode 100644 index 0000000000..5e393093e9 --- /dev/null +++ b/internal/multicluster/internal/controllers/exportedservices/status.go @@ -0,0 +1,42 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package exportedservices + +import ( + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + statusKey = "consul.io/exported-services" + + statusExportedServicesComputed = "ExportedServicesComputed" + statusMissingSamenessGroups = "MissingSamenessGroups" + + msgExportedServicesComputed = "Exported services have been computed" +) + +func conditionComputed() *pbresource.Condition { + return &pbresource.Condition{ + Type: statusExportedServicesComputed, + State: pbresource.Condition_STATE_TRUE, + Message: msgExportedServicesComputed, + } +} + +func conditionNotComputed(message string) *pbresource.Condition { + return &pbresource.Condition{ + Type: statusExportedServicesComputed, + State: pbresource.Condition_STATE_FALSE, + Message: message, + } +} + +func conditionMissingSamenessGroups(message string) *pbresource.Condition { + return &pbresource.Condition{ + Type: statusMissingSamenessGroups, + State: pbresource.Condition_STATE_TRUE, + Reason: "MissingSamenessGroups", + Message: message, + } +}