diff --git a/internal/catalog/internal/controllers/endpoints/controller_test.go b/internal/catalog/internal/controllers/endpoints/controller_test.go index 5a0155ef76..6bf54a1435 100644 --- a/internal/catalog/internal/controllers/endpoints/controller_test.go +++ b/internal/catalog/internal/controllers/endpoints/controller_test.go @@ -5,6 +5,7 @@ package endpoints import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/consul/internal/catalog/internal/types" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource/mappers/selectiontracker" + "github.com/hashicorp/consul/internal/resource/resourcetest" rtest "github.com/hashicorp/consul/internal/resource/resourcetest" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" @@ -441,11 +443,13 @@ type controllerSuite struct { tracker *selectiontracker.WorkloadSelectionTracker reconciler *serviceEndpointsReconciler + tenancies []*pbresource.Tenancy } func (suite *controllerSuite) SetupTest() { + suite.tenancies = resourcetest.TestTenancies() suite.ctx = testutil.TestContext(suite.T()) - client := svctest.RunResourceService(suite.T(), types.Register) + client := svctest.RunResourceServiceWithTenancies(suite.T(), types.Register) suite.rt = controller.Runtime{ Client: client, Logger: testutil.Logger(suite.T()), @@ -478,25 +482,28 @@ func (suite *controllerSuite) TestReconcile_ServiceNotFound() { // generate a workload resource to use for checking if it maps // to a service endpoints object - workload := rtest.Resource(pbcatalog.WorkloadType, "foo").Build() - // ensure that the tracker knows about the service prior to - // calling reconcile so that we can ensure it removes tracking - id := rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").ID() - suite.tracker.TrackIDForSelector(id, &pbcatalog.WorkloadSelector{Prefixes: []string{""}}) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + workload := rtest.Resource(pbcatalog.WorkloadType, "foo").WithTenancy(tenancy).Build() - // verify that mapping the workload to service endpoints returns a - // non-empty list prior to reconciliation which should remove the - // tracking. - suite.requireTracking(workload, id) + // ensure that the tracker knows about the service prior to + // calling reconcile so that we can ensure it removes tracking + id := rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").WithTenancy(tenancy).ID() + suite.tracker.TrackIDForSelector(id, &pbcatalog.WorkloadSelector{Prefixes: []string{""}}) - // Because the endpoints don't exist, this reconcile call should - // cause tracking of the endpoints to be removed - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id}) - require.NoError(suite.T(), err) + // verify that mapping the workload to service endpoints returns a + // non-empty list prior to reconciliation which should remove the + // tracking. + suite.requireTracking(workload, id) + + // Because the endpoints don't exist, this reconcile call should + // cause tracking of the endpoints to be removed + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id}) + require.NoError(suite.T(), err) - // Now ensure that the tracking was removed - suite.requireTracking(workload) + // Now ensure that the tracking was removed + suite.requireTracking(workload) + }) } func (suite *controllerSuite) TestReconcile_NoSelector_NoEndpoints() { @@ -505,20 +512,23 @@ func (suite *controllerSuite) TestReconcile_NoSelector_NoEndpoints() { // managed. Additionally, with no endpoints pre-existing it will // not attempt to delete them. - service := rtest.Resource(pbcatalog.ServiceType, "test"). - WithData(suite.T(), &pbcatalog.Service{ - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + service := rtest.Resource(pbcatalog.ServiceType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) - endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").ID() + endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").WithTenancy(tenancy).ID() - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID}) - require.NoError(suite.T(), err) + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID}) + require.NoError(suite.T(), err) - suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) + suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) + }) } func (suite *controllerSuite) TestReconcile_NoSelector_ManagedEndpoints() { @@ -526,26 +536,30 @@ func (suite *controllerSuite) TestReconcile_NoSelector_ManagedEndpoints() { // to unmanaged endpoints for a service, any already generated managed endpoints // get deleted. - service := rtest.Resource(pbcatalog.ServiceType, "test"). - WithData(suite.T(), &pbcatalog.Service{ - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) - - endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). - WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). - // this marks these endpoints as under management - WithMeta(endpointsMetaManagedBy, StatusKey). - Write(suite.T(), suite.client) - - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) - require.NoError(suite.T(), err) - // the status should indicate the services endpoints are not being managed - suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) - // endpoints under management should be deleted - suite.client.RequireResourceNotFound(suite.T(), endpoints.Id) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + service := rtest.Resource(pbcatalog.ServiceType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) + + endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). + // this marks these endpoints as under management + WithMeta(endpointsMetaManagedBy, StatusKey). + Write(suite.T(), suite.client) + + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) + require.NoError(suite.T(), err) + // the status should indicate the services endpoints are not being managed + suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) + // endpoints under management should be deleted + suite.client.RequireResourceNotFound(suite.T(), endpoints.Id) + }) } func (suite *controllerSuite) TestReconcile_NoSelector_UnmanagedEndpoints() { @@ -553,65 +567,73 @@ func (suite *controllerSuite) TestReconcile_NoSelector_UnmanagedEndpoints() { // doesn't have its endpoints managed, that we do not delete any unmanaged // ServiceEndpoints resource that the user would have manually written. - service := rtest.Resource(pbcatalog.ServiceType, "test"). - WithData(suite.T(), &pbcatalog.Service{ - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) - - endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). - WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). - Write(suite.T(), suite.client) - - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) - require.NoError(suite.T(), err) - // the status should indicate the services endpoints are not being managed - suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) - // unmanaged endpoints should not be deleted when the service is unmanaged - suite.client.RequireResourceExists(suite.T(), endpoints.Id) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + service := rtest.Resource(pbcatalog.ServiceType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) + + endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). + Write(suite.T(), suite.client) + + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) + require.NoError(suite.T(), err) + // the status should indicate the services endpoints are not being managed + suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged) + // unmanaged endpoints should not be deleted when the service is unmanaged + suite.client.RequireResourceExists(suite.T(), endpoints.Id) + }) } func (suite *controllerSuite) TestReconcile_Managed_NoPreviousEndpoints() { // This test's purpose is to ensure the managed endpoint generation occurs // as expected when there are no pre-existing endpoints. - service := rtest.Resource(pbcatalog.ServiceType, "test"). - WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Prefixes: []string{""}, - }, - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + service := rtest.Resource(pbcatalog.ServiceType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{""}, + }, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) - endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").ID() + endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").WithTenancy(tenancy).ID() - rtest.Resource(pbcatalog.WorkloadType, "test-workload"). - WithData(suite.T(), &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) + rtest.Resource(pbcatalog.WorkloadType, "test-workload"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID}) - require.NoError(suite.T(), err) + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID}) + require.NoError(suite.T(), err) - // Verify that the services status has been set to indicate endpoints are automatically managed. - suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged) + // Verify that the services status has been set to indicate endpoints are automatically managed. + suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged) - // The service endpoints metadata should include our tag to indcate it was generated by this controller - res := suite.client.RequireResourceMeta(suite.T(), endpointsID, endpointsMetaManagedBy, StatusKey) + // The service endpoints metadata should include our tag to indcate it was generated by this controller + res := suite.client.RequireResourceMeta(suite.T(), endpointsID, endpointsMetaManagedBy, StatusKey) - var endpoints pbcatalog.ServiceEndpoints - err = res.Data.UnmarshalTo(&endpoints) - require.NoError(suite.T(), err) - require.Len(suite.T(), endpoints.Endpoints, 1) + var endpoints pbcatalog.ServiceEndpoints + err = res.Data.UnmarshalTo(&endpoints) + require.NoError(suite.T(), err) + require.Len(suite.T(), endpoints.Endpoints, 1) + }) // We are not going to retest that the workloads to endpoints conversion process // The length check should be sufficient to prove the endpoints are being // converted. The unit tests for the workloadsToEndpoints functions prove that @@ -622,41 +644,46 @@ func (suite *controllerSuite) TestReconcile_Managed_ExistingEndpoints() { // This test's purpose is to ensure that when the current set of endpoints // differs from any prior set of endpoints that the resource gets rewritten. - service := rtest.Resource(pbcatalog.ServiceType, "test"). - WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Prefixes: []string{""}, - }, - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) - - endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). - WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). - WithOwner(service.Id). - Write(suite.T(), suite.client) - - rtest.Resource(pbcatalog.WorkloadType, "test-workload"). - WithData(suite.T(), &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + service := rtest.Resource(pbcatalog.ServiceType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{""}, + }, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) + + endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{}). + WithOwner(service.Id). + Write(suite.T(), suite.client) + + rtest.Resource(pbcatalog.WorkloadType, "test-workload"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) - err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) - require.NoError(suite.T(), err) + err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id}) + require.NoError(suite.T(), err) - suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged) - res := suite.client.RequireResourceMeta(suite.T(), endpoints.Id, endpointsMetaManagedBy, StatusKey) + suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged) + res := suite.client.RequireResourceMeta(suite.T(), endpoints.Id, endpointsMetaManagedBy, StatusKey) - var newEndpoints pbcatalog.ServiceEndpoints - err = res.Data.UnmarshalTo(&newEndpoints) - require.NoError(suite.T(), err) - require.Len(suite.T(), newEndpoints.Endpoints, 1) + var newEndpoints pbcatalog.ServiceEndpoints + err = res.Data.UnmarshalTo(&newEndpoints) + require.NoError(suite.T(), err) + require.Len(suite.T(), newEndpoints.Endpoints, 1) + }) } func (suite *controllerSuite) TestController() { @@ -673,184 +700,202 @@ func (suite *controllerSuite) TestController() { mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) - // Add a service - there are no workloads so an empty endpoints - // object should be created. - service := rtest.Resource(pbcatalog.ServiceType, "api"). - WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Prefixes: []string{"api-"}, - }, - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + // Add a service - there are no workloads so an empty endpoints + // object should be created. + service := rtest.Resource(pbcatalog.ServiceType, "api"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"api-"}, + }, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) + + // Wait for the controller to record that the endpoints are being managed + res := suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) + // Check that the services status was updated accordingly + rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionManaged) + rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionIdentitiesNotFound) + + // Check that the endpoints resource exists and contains 0 endpoints + endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "api").WithTenancy(tenancy).ID() + endpoints := suite.client.RequireResourceExists(suite.T(), endpointsID) + suite.requireEndpoints(endpoints) + + // Now add a workload that would be selected by the service. Leave + // the workload in a state where its health has not been reconciled + workload := rtest.Resource(pbcatalog.WorkloadType, "api-1"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + Identity: "api", + }). + Write(suite.T(), suite.client) + + suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, + ConditionIdentitiesFound([]string{"api"})) + + // Wait for the endpoints to be regenerated + endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) + + // Verify that the generated endpoints now contain the workload + suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ + TargetRef: workload.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"http"}}, }, - }). - Write(suite.T(), suite.client) - - // Wait for the controller to record that the endpoints are being managed - res := suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) - // Check that the services status was updated accordingly - rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionManaged) - rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionIdentitiesNotFound) - - // Check that the endpoints resource exists and contains 0 endpoints - endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "api").ID() - endpoints := suite.client.RequireResourceExists(suite.T(), endpointsID) - suite.requireEndpoints(endpoints) - - // Now add a workload that would be selected by the service. Leave - // the workload in a state where its health has not been reconciled - workload := rtest.Resource(pbcatalog.WorkloadType, "api-1"). - WithData(suite.T(), &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, Ports: map[string]*pbcatalog.WorkloadPort{ "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - "grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, }, - Identity: "api", - }). - Write(suite.T(), suite.client) - - suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, - ConditionIdentitiesFound([]string{"api"})) - - // Wait for the endpoints to be regenerated - endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) - - // Verify that the generated endpoints now contain the workload - suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ - TargetRef: workload.Id, - Addresses: []*pbcatalog.WorkloadAddress{ - {Host: "127.0.0.1", Ports: []string{"http"}}, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - HealthStatus: pbcatalog.Health_HEALTH_CRITICAL, - Identity: "api", - }) + HealthStatus: pbcatalog.Health_HEALTH_CRITICAL, + Identity: "api", + }) - // Update the health status of the workload - suite.client.WriteStatus(suite.ctx, &pbresource.WriteStatusRequest{ - Id: workload.Id, - Key: workloadhealth.StatusKey, - Status: &pbresource.Status{ - ObservedGeneration: workload.Generation, - Conditions: []*pbresource.Condition{ - { - Type: workloadhealth.StatusConditionHealthy, - State: pbresource.Condition_STATE_TRUE, - Reason: "HEALTH_PASSING", + // Update the health status of the workload + suite.client.WriteStatus(suite.ctx, &pbresource.WriteStatusRequest{ + Id: workload.Id, + Key: workloadhealth.StatusKey, + Status: &pbresource.Status{ + ObservedGeneration: workload.Generation, + Conditions: []*pbresource.Condition{ + { + Type: workloadhealth.StatusConditionHealthy, + State: pbresource.Condition_STATE_TRUE, + Reason: "HEALTH_PASSING", + }, }, }, - }, - }) - - // Wait for the endpoints to be regenerated - endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) + }) - // ensure the endpoint was put into the passing state - suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ - TargetRef: workload.Id, - Addresses: []*pbcatalog.WorkloadAddress{ - {Host: "127.0.0.1", Ports: []string{"http"}}, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - HealthStatus: pbcatalog.Health_HEALTH_PASSING, - Identity: "api", - }) + // Wait for the endpoints to be regenerated + endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) - // Update workload identity and check that the status on the service is updated - workload = rtest.Resource(pbcatalog.WorkloadType, "api-1"). - WithData(suite.T(), &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + // ensure the endpoint was put into the passing state + suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ + TargetRef: workload.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"http"}}, + }, Ports: map[string]*pbcatalog.WorkloadPort{ "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - "grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, }, - Identity: "endpoints-api-identity", - }). - Write(suite.T(), suite.client) - - suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, ConditionIdentitiesFound([]string{"endpoints-api-identity"})) - - // Verify that the generated endpoints now contain the workload - endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) - suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ - TargetRef: workload.Id, - Addresses: []*pbcatalog.WorkloadAddress{ - {Host: "127.0.0.1", Ports: []string{"http"}}, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - HealthStatus: pbcatalog.Health_HEALTH_PASSING, - Identity: "endpoints-api-identity", - }) + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + Identity: "api", + }) - // rewrite the service to add more selection criteria. This should trigger - // reconciliation but shouldn't result in updating the endpoints because - // the actual list of currently selected workloads has not changed - rtest.Resource(pbcatalog.ServiceType, "api"). - WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Prefixes: []string{"api-"}, - Names: []string{"doesnt-matter"}, + // Update workload identity and check that the status on the service is updated + workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + Identity: "endpoints-api-identity", + }). + Write(suite.T(), suite.client) + + suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, ConditionIdentitiesFound([]string{"endpoints-api-identity"})) + + // Verify that the generated endpoints now contain the workload + endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) + suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ + TargetRef: workload.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"http"}}, }, - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, }, - }). - Write(suite.T(), suite.client) + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + Identity: "endpoints-api-identity", + }) - // Wait for the service status' observed generation to get bumped - service = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) + // rewrite the service to add more selection criteria. This should trigger + // reconciliation but shouldn't result in updating the endpoints because + // the actual list of currently selected workloads has not changed + rtest.Resource(pbcatalog.ServiceType, "api").WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"api-"}, + Names: []string{"doesnt-matter"}, + }, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) - // Verify that the endpoints were not regenerated - suite.client.RequireVersionUnchanged(suite.T(), endpointsID, endpoints.Version) + // Wait for the service status' observed generation to get bumped + service = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) - // Update the service. - updatedService := rtest.Resource(pbcatalog.ServiceType, "api"). - WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Prefixes: []string{"api-"}, - }, - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - {TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, - }, - }). - Write(suite.T(), suite.client) + // Verify that the endpoints were not regenerated + suite.client.RequireVersionUnchanged(suite.T(), endpointsID, endpoints.Version) - // Wait for the endpoints to be regenerated - endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) - rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false) + // Update the service. + updatedService := rtest.Resource(pbcatalog.ServiceType, "api"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"api-"}, + }, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + {TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + }). + Write(suite.T(), suite.client) - // Delete the endpoints. The controller should bring these back momentarily - suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID}) + // Wait for the endpoints to be regenerated + endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) + rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false) - // Wait for controller to recreate the endpoints - retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceExists(r, endpointsID) - }) + // Delete the endpoints. The controller should bring these back momentarily + suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID}) - // Move the service to having unmanaged endpoints - rtest.Resource(pbcatalog.ServiceType, "api"). - WithData(suite.T(), &pbcatalog.Service{ - Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - }, - }). - Write(suite.T(), suite.client) + // Wait for controller to recreate the endpoints + retry.Run(suite.T(), func(r *retry.R) { + suite.client.RequireResourceExists(r, endpointsID) + }) - res = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) - rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionUnmanaged) + // Move the service to having unmanaged endpoints + rtest.Resource(pbcatalog.ServiceType, "api"). + WithTenancy(tenancy). + WithData(suite.T(), &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }). + Write(suite.T(), suite.client) + + res = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey) + rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionUnmanaged) - // Verify that the endpoints were deleted - suite.client.RequireResourceNotFound(suite.T(), endpointsID) + // Verify that the endpoints were deleted + suite.client.RequireResourceNotFound(suite.T(), endpointsID) + }) } func TestController(t *testing.T) { suite.Run(t, new(controllerSuite)) } + +func (suite *controllerSuite) runTestCaseWithTenancies(testFunc func(*pbresource.Tenancy)) { + for _, tenancy := range suite.tenancies { + suite.Run(suite.appendTenancyInfo(tenancy), func() { + testFunc(tenancy) + }) + } +} + +func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string { + return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition) +} diff --git a/internal/catalog/internal/controllers/endpoints/reconciliation_data.go b/internal/catalog/internal/controllers/endpoints/reconciliation_data.go index 320ad47470..186354eda9 100644 --- a/internal/catalog/internal/controllers/endpoints/reconciliation_data.go +++ b/internal/catalog/internal/controllers/endpoints/reconciliation_data.go @@ -34,7 +34,7 @@ type workloadData struct { // getServiceData will read the service with the given ID and unmarshal the // Data field. The return value is a struct that contains the retrieved -// resource as well as the unmsashalled form. If the resource doesn't +// resource as well as the unmarshalled form. If the resource doesn't // exist, nil will be returned. Any other error either with retrieving // the resource or unmarshalling it will cause the error to be returned // to the caller diff --git a/internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go b/internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go index d855c710f6..96ddceb481 100644 --- a/internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go +++ b/internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go @@ -5,6 +5,7 @@ package endpoints import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -17,6 +18,7 @@ import ( "github.com/hashicorp/consul/internal/catalog/internal/types" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" rtest "github.com/hashicorp/consul/internal/resource/resourcetest" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" @@ -28,7 +30,7 @@ type reconciliationDataSuite struct { suite.Suite ctx context.Context - client pbresource.ResourceServiceClient + client *resourcetest.Client rt controller.Runtime apiServiceData *pbcatalog.Service @@ -41,11 +43,16 @@ type reconciliationDataSuite struct { api123Workload *pbresource.Resource web1Workload *pbresource.Resource web2Workload *pbresource.Resource + + tenancies []*pbresource.Tenancy } func (suite *reconciliationDataSuite) SetupTest() { suite.ctx = testutil.TestContext(suite.T()) - suite.client = svctest.RunResourceService(suite.T(), types.Register) + + suite.tenancies = rtest.TestTenancies() + resourceClient := svctest.RunResourceServiceWithTenancies(suite.T(), types.Register) + suite.client = resourcetest.NewClient(resourceClient) suite.rt = controller.Runtime{ Client: suite.client, Logger: testutil.Logger(suite.T()), @@ -67,16 +74,174 @@ func (suite *reconciliationDataSuite) SetupTest() { } suite.apiServiceSubsetData = proto.Clone(suite.apiServiceData).(*pbcatalog.Service) suite.apiServiceSubsetData.Workloads.Filter = "(zim in metadata) and (metadata.zim matches `^g.`)" +} + +func (suite *reconciliationDataSuite) TestGetServiceData_NotFound() { + // This test's purposes is to ensure that NotFound errors when retrieving + // the service data are ignored properly. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "not-found").WithTenancy(tenancy).ID()) + require.NoError(suite.T(), err) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetServiceData_ReadError() { + // This test's purpose is to ensure that Read errors other than NotFound + // are propagated back to the caller. Specifying a resource ID with an + // unregistered type is the easiest way to force a resource service error. + badType := &pbresource.Type{ + Group: "not", + Kind: "found", + GroupVersion: "vfake", + } + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").WithTenancy(tenancy).ID()) + require.Error(suite.T(), err) + require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetServiceData_UnmarshalError() { + // This test's purpose is to ensure that unmarshlling errors are returned + // to the caller. We are using a resource id that points to an endpoints + // object instead of a service to ensure that the data will be unmarshallable. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "api").WithTenancy(tenancy).ID()) + require.Error(suite.T(), err) + var parseErr resource.ErrDataParse + require.ErrorAs(suite.T(), err, &parseErr) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetServiceData_Ok() { + // This test's purpose is to ensure that the happy path for + // retrieving a service works as expected. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getServiceData(suite.ctx, suite.rt, suite.apiService.Id) + require.NoError(suite.T(), err) + require.NotNil(suite.T(), data) + require.NotNil(suite.T(), data.resource) + prototest.AssertDeepEqual(suite.T(), suite.apiService.Id, data.resource.Id) + require.Len(suite.T(), data.service.Ports, 1) + }) +} + +func (suite *reconciliationDataSuite) TestGetEndpointsData_NotFound() { + // This test's purposes is to ensure that NotFound errors when retrieving + // the endpoint data are ignored properly. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").WithTenancy(tenancy).ID()) + require.NoError(suite.T(), err) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetEndpointsData_ReadError() { + // This test's purpose is to ensure that Read errors other than NotFound + // are propagated back to the caller. Specifying a resource ID with an + // unregistered type is the easiest way to force a resource service error. + badType := &pbresource.Type{ + Group: "not", + Kind: "found", + GroupVersion: "vfake", + } + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").WithTenancy(tenancy).ID()) + require.Error(suite.T(), err) + require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetEndpointsData_UnmarshalError() { + // This test's purpose is to ensure that unmarshlling errors are returned + // to the caller. We are using a resource id that points to a service object + // instead of an endpoints object to ensure that the data will be unmarshallable. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "api").WithTenancy(tenancy).ID()) + require.Error(suite.T(), err) + var parseErr resource.ErrDataParse + require.ErrorAs(suite.T(), err, &parseErr) + require.Nil(suite.T(), data) + }) +} + +func (suite *reconciliationDataSuite) TestGetEndpointsData_Ok() { + // This test's purpose is to ensure that the happy path for + // retrieving an endpoints object works as expected. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + data, err := getEndpointsData(suite.ctx, suite.rt, suite.apiEndpoints.Id) + require.NoError(suite.T(), err) + require.NotNil(suite.T(), data) + require.NotNil(suite.T(), data.resource) + prototest.AssertDeepEqual(suite.T(), suite.apiEndpoints.Id, data.resource.Id) + require.Len(suite.T(), data.endpoints.Endpoints, 1) + }) +} + +func (suite *reconciliationDataSuite) TestGetWorkloadData() { + // This test's purpose is to ensure that gather workloads for + // a service work as expected. The services selector was crafted + // to exercise the deduplication behavior as well as the sorting + // behavior. The assertions in this test will verify that only + // unique workloads are returned and that they are ordered. + + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + require.NotNil(suite.T(), suite.apiService) + + data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{ + resource: suite.apiService, + service: suite.apiServiceData, + }) + + require.NoError(suite.T(), err) + require.Len(suite.T(), data, 5) + prototest.AssertDeepEqual(suite.T(), suite.api1Workload, data[0].resource) + prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[1].resource) + prototest.AssertDeepEqual(suite.T(), suite.api2Workload, data[2].resource) + prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[3].resource) + prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource) + }) +} + +func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() { + // This is like TestGetWorkloadData except it exercises the post-read + // filter on the selector. + suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) { + require.NotNil(suite.T(), suite.apiServiceSubset) + + data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{ + resource: suite.apiServiceSubset, + service: suite.apiServiceSubsetData, + }) + + require.NoError(suite.T(), err) + require.Len(suite.T(), data, 2) + prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource) + prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource) + }) +} + +func TestReconciliationData(t *testing.T) { + suite.Run(t, new(reconciliationDataSuite)) +} +func (suite *reconciliationDataSuite) setupResourcesWithTenancy(tenancy *pbresource.Tenancy) { suite.apiService = rtest.Resource(pbcatalog.ServiceType, "api"). + WithTenancy(tenancy). WithData(suite.T(), suite.apiServiceData). Write(suite.T(), suite.client) suite.apiServiceSubset = rtest.Resource(pbcatalog.ServiceType, "api-subset"). + WithTenancy(tenancy). WithData(suite.T(), suite.apiServiceSubsetData). Write(suite.T(), suite.client) suite.api1Workload = rtest.Resource(pbcatalog.WorkloadType, "api-1"). + WithTenancy(tenancy). WithMeta("zim", "dib"). WithData(suite.T(), &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ @@ -90,6 +255,7 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) suite.api2Workload = rtest.Resource(pbcatalog.WorkloadType, "api-2"). + WithTenancy(tenancy). WithData(suite.T(), &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ {Host: "127.0.0.1"}, @@ -102,6 +268,7 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) suite.api123Workload = rtest.Resource(pbcatalog.WorkloadType, "api-123"). + WithTenancy(tenancy). WithMeta("zim", "gir"). WithData(suite.T(), &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ @@ -115,6 +282,7 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) suite.web1Workload = rtest.Resource(pbcatalog.WorkloadType, "web-1"). + WithTenancy(tenancy). WithMeta("zim", "gaz"). WithData(suite.T(), &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ @@ -128,6 +296,7 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) suite.web2Workload = rtest.Resource(pbcatalog.WorkloadType, "web-2"). + WithTenancy(tenancy). WithData(suite.T(), &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ {Host: "127.0.0.1"}, @@ -140,10 +309,11 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) suite.apiEndpoints = rtest.Resource(pbcatalog.ServiceEndpointsType, "api"). + WithTenancy(tenancy). WithData(suite.T(), &pbcatalog.ServiceEndpoints{ Endpoints: []*pbcatalog.Endpoint{ { - TargetRef: rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(resource.DefaultNamespacedTenancy()).ID(), + TargetRef: rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(tenancy).ID(), Addresses: []*pbcatalog.WorkloadAddress{ { Host: "127.0.0.1", @@ -160,131 +330,27 @@ func (suite *reconciliationDataSuite) SetupTest() { Write(suite.T(), suite.client) } -func (suite *reconciliationDataSuite) TestGetServiceData_NotFound() { - // This test's purposes is to ensure that NotFound errors when retrieving - // the service data are ignored properly. - data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "not-found").WithTenancy(resource.DefaultNamespacedTenancy()).ID()) - require.NoError(suite.T(), err) - require.Nil(suite.T(), data) +func (suite *reconciliationDataSuite) cleanupResources() { + suite.client.MustDelete(suite.T(), suite.apiService.Id) + suite.client.MustDelete(suite.T(), suite.apiServiceSubset.Id) + suite.client.MustDelete(suite.T(), suite.api1Workload.Id) + suite.client.MustDelete(suite.T(), suite.api2Workload.Id) + suite.client.MustDelete(suite.T(), suite.api123Workload.Id) + suite.client.MustDelete(suite.T(), suite.web1Workload.Id) + suite.client.MustDelete(suite.T(), suite.web2Workload.Id) + suite.client.MustDelete(suite.T(), suite.apiEndpoints.Id) } -func (suite *reconciliationDataSuite) TestGetServiceData_ReadError() { - // This test's purpose is to ensure that Read errors other than NotFound - // are propagated back to the caller. Specifying a resource ID with an - // unregistered type is the easiest way to force a resource service error. - badType := &pbresource.Type{ - Group: "not", - Kind: "found", - GroupVersion: "vfake", +func (suite *reconciliationDataSuite) runTestCaseWithTenancies(testFunc func(*pbresource.Tenancy)) { + for _, tenancy := range suite.tenancies { + suite.Run(suite.appendTenancyInfo(tenancy), func() { + suite.setupResourcesWithTenancy(tenancy) + testFunc(tenancy) + suite.T().Cleanup(suite.cleanupResources) + }) } - data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").ID()) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - require.Nil(suite.T(), data) -} - -func (suite *reconciliationDataSuite) TestGetServiceData_UnmarshalError() { - // This test's purpose is to ensure that unmarshlling errors are returned - // to the caller. We are using a resource id that points to an endpoints - // object instead of a service to ensure that the data will be unmarshallable. - data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "api").ID()) - require.Error(suite.T(), err) - var parseErr resource.ErrDataParse - require.ErrorAs(suite.T(), err, &parseErr) - require.Nil(suite.T(), data) -} - -func (suite *reconciliationDataSuite) TestGetServiceData_Ok() { - // This test's purpose is to ensure that the happy path for - // retrieving a service works as expected. - data, err := getServiceData(suite.ctx, suite.rt, suite.apiService.Id) - require.NoError(suite.T(), err) - require.NotNil(suite.T(), data) - require.NotNil(suite.T(), data.resource) - prototest.AssertDeepEqual(suite.T(), suite.apiService.Id, data.resource.Id) - require.Len(suite.T(), data.service.Ports, 1) -} - -func (suite *reconciliationDataSuite) TestGetEndpointsData_NotFound() { - // This test's purposes is to ensure that NotFound errors when retrieving - // the endpoint data are ignored properly. - data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").ID()) - require.NoError(suite.T(), err) - require.Nil(suite.T(), data) } -func (suite *reconciliationDataSuite) TestGetEndpointsData_ReadError() { - // This test's purpose is to ensure that Read errors other than NotFound - // are propagated back to the caller. Specifying a resource ID with an - // unregistered type is the easiest way to force a resource service error. - badType := &pbresource.Type{ - Group: "not", - Kind: "found", - GroupVersion: "vfake", - } - data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").ID()) - require.Error(suite.T(), err) - require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) - require.Nil(suite.T(), data) -} - -func (suite *reconciliationDataSuite) TestGetEndpointsData_UnmarshalError() { - // This test's purpose is to ensure that unmarshlling errors are returned - // to the caller. We are using a resource id that points to a service object - // instead of an endpoints object to ensure that the data will be unmarshallable. - data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "api").ID()) - require.Error(suite.T(), err) - var parseErr resource.ErrDataParse - require.ErrorAs(suite.T(), err, &parseErr) - require.Nil(suite.T(), data) -} - -func (suite *reconciliationDataSuite) TestGetEndpointsData_Ok() { - // This test's purpose is to ensure that the happy path for - // retrieving an endpoints object works as expected. - data, err := getEndpointsData(suite.ctx, suite.rt, suite.apiEndpoints.Id) - require.NoError(suite.T(), err) - require.NotNil(suite.T(), data) - require.NotNil(suite.T(), data.resource) - prototest.AssertDeepEqual(suite.T(), suite.apiEndpoints.Id, data.resource.Id) - require.Len(suite.T(), data.endpoints.Endpoints, 1) -} - -func (suite *reconciliationDataSuite) TestGetWorkloadData() { - // This test's purpose is to ensure that gather workloads for - // a service work as expected. The services selector was crafted - // to exercise the deduplication behavior as well as the sorting - // behavior. The assertions in this test will verify that only - // unique workloads are returned and that they are ordered. - - data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{ - resource: suite.apiService, - service: suite.apiServiceData, - }) - - require.NoError(suite.T(), err) - require.Len(suite.T(), data, 5) - prototest.AssertDeepEqual(suite.T(), suite.api1Workload, data[0].resource) - prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[1].resource) - prototest.AssertDeepEqual(suite.T(), suite.api2Workload, data[2].resource) - prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[3].resource) - prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource) -} - -func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() { - // This is like TestGetWorkloadData except it exercises the post-read - // filter on the selector. - data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{ - resource: suite.apiServiceSubset, - service: suite.apiServiceSubsetData, - }) - - require.NoError(suite.T(), err) - require.Len(suite.T(), data, 2) - prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource) - prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource) -} - -func TestReconciliationData(t *testing.T) { - suite.Run(t, new(reconciliationDataSuite)) +func (suite *reconciliationDataSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string { + return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition) }