Browse Source

Added tenancy tests for endpoints controller (#19650)

pull/19628/head
Ganesh S 1 year ago committed by GitHub
parent
commit
2e28aecff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 601
      internal/catalog/internal/controllers/endpoints/controller_test.go
  2. 2
      internal/catalog/internal/controllers/endpoints/reconciliation_data.go
  3. 316
      internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go

601
internal/catalog/internal/controllers/endpoints/controller_test.go

@ -5,6 +5,7 @@ package endpoints
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -15,6 +16,7 @@ import (
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource/mappers/selectiontracker"
"github.com/hashicorp/consul/internal/resource/resourcetest"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
@ -441,11 +443,13 @@ type controllerSuite struct {
tracker *selectiontracker.WorkloadSelectionTracker
reconciler *serviceEndpointsReconciler
tenancies []*pbresource.Tenancy
}
func (suite *controllerSuite) SetupTest() {
suite.tenancies = resourcetest.TestTenancies()
suite.ctx = testutil.TestContext(suite.T())
client := svctest.RunResourceService(suite.T(), types.Register)
client := svctest.RunResourceServiceWithTenancies(suite.T(), types.Register)
suite.rt = controller.Runtime{
Client: client,
Logger: testutil.Logger(suite.T()),
@ -478,25 +482,28 @@ func (suite *controllerSuite) TestReconcile_ServiceNotFound() {
// generate a workload resource to use for checking if it maps
// to a service endpoints object
workload := rtest.Resource(pbcatalog.WorkloadType, "foo").Build()
// ensure that the tracker knows about the service prior to
// calling reconcile so that we can ensure it removes tracking
id := rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").ID()
suite.tracker.TrackIDForSelector(id, &pbcatalog.WorkloadSelector{Prefixes: []string{""}})
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
workload := rtest.Resource(pbcatalog.WorkloadType, "foo").WithTenancy(tenancy).Build()
// verify that mapping the workload to service endpoints returns a
// non-empty list prior to reconciliation which should remove the
// tracking.
suite.requireTracking(workload, id)
// ensure that the tracker knows about the service prior to
// calling reconcile so that we can ensure it removes tracking
id := rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").WithTenancy(tenancy).ID()
suite.tracker.TrackIDForSelector(id, &pbcatalog.WorkloadSelector{Prefixes: []string{""}})
// Because the endpoints don't exist, this reconcile call should
// cause tracking of the endpoints to be removed
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
require.NoError(suite.T(), err)
// verify that mapping the workload to service endpoints returns a
// non-empty list prior to reconciliation which should remove the
// tracking.
suite.requireTracking(workload, id)
// Because the endpoints don't exist, this reconcile call should
// cause tracking of the endpoints to be removed
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
require.NoError(suite.T(), err)
// Now ensure that the tracking was removed
suite.requireTracking(workload)
// Now ensure that the tracking was removed
suite.requireTracking(workload)
})
}
func (suite *controllerSuite) TestReconcile_NoSelector_NoEndpoints() {
@ -505,20 +512,23 @@ func (suite *controllerSuite) TestReconcile_NoSelector_NoEndpoints() {
// managed. Additionally, with no endpoints pre-existing it will
// not attempt to delete them.
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").ID()
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").WithTenancy(tenancy).ID()
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID})
require.NoError(suite.T(), err)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID})
require.NoError(suite.T(), err)
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
})
}
func (suite *controllerSuite) TestReconcile_NoSelector_ManagedEndpoints() {
@ -526,26 +536,30 @@ func (suite *controllerSuite) TestReconcile_NoSelector_ManagedEndpoints() {
// to unmanaged endpoints for a service, any already generated managed endpoints
// get deleted.
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
// this marks these endpoints as under management
WithMeta(endpointsMetaManagedBy, StatusKey).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
// the status should indicate the services endpoints are not being managed
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
// endpoints under management should be deleted
suite.client.RequireResourceNotFound(suite.T(), endpoints.Id)
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
// this marks these endpoints as under management
WithMeta(endpointsMetaManagedBy, StatusKey).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
// the status should indicate the services endpoints are not being managed
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
// endpoints under management should be deleted
suite.client.RequireResourceNotFound(suite.T(), endpoints.Id)
})
}
func (suite *controllerSuite) TestReconcile_NoSelector_UnmanagedEndpoints() {
@ -553,65 +567,73 @@ func (suite *controllerSuite) TestReconcile_NoSelector_UnmanagedEndpoints() {
// doesn't have its endpoints managed, that we do not delete any unmanaged
// ServiceEndpoints resource that the user would have manually written.
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
// the status should indicate the services endpoints are not being managed
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
// unmanaged endpoints should not be deleted when the service is unmanaged
suite.client.RequireResourceExists(suite.T(), endpoints.Id)
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
// the status should indicate the services endpoints are not being managed
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionUnmanaged)
// unmanaged endpoints should not be deleted when the service is unmanaged
suite.client.RequireResourceExists(suite.T(), endpoints.Id)
})
}
func (suite *controllerSuite) TestReconcile_Managed_NoPreviousEndpoints() {
// This test's purpose is to ensure the managed endpoint generation occurs
// as expected when there are no pre-existing endpoints.
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{""},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{""},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").ID()
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").WithTenancy(tenancy).ID()
rtest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
rtest.Resource(pbcatalog.WorkloadType, "test-workload").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID})
require.NoError(suite.T(), err)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpointsID})
require.NoError(suite.T(), err)
// Verify that the services status has been set to indicate endpoints are automatically managed.
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged)
// Verify that the services status has been set to indicate endpoints are automatically managed.
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged)
// The service endpoints metadata should include our tag to indcate it was generated by this controller
res := suite.client.RequireResourceMeta(suite.T(), endpointsID, endpointsMetaManagedBy, StatusKey)
// The service endpoints metadata should include our tag to indcate it was generated by this controller
res := suite.client.RequireResourceMeta(suite.T(), endpointsID, endpointsMetaManagedBy, StatusKey)
var endpoints pbcatalog.ServiceEndpoints
err = res.Data.UnmarshalTo(&endpoints)
require.NoError(suite.T(), err)
require.Len(suite.T(), endpoints.Endpoints, 1)
var endpoints pbcatalog.ServiceEndpoints
err = res.Data.UnmarshalTo(&endpoints)
require.NoError(suite.T(), err)
require.Len(suite.T(), endpoints.Endpoints, 1)
})
// We are not going to retest that the workloads to endpoints conversion process
// The length check should be sufficient to prove the endpoints are being
// converted. The unit tests for the workloadsToEndpoints functions prove that
@ -622,41 +644,46 @@ func (suite *controllerSuite) TestReconcile_Managed_ExistingEndpoints() {
// This test's purpose is to ensure that when the current set of endpoints
// differs from any prior set of endpoints that the resource gets rewritten.
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{""},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
WithOwner(service.Id).
Write(suite.T(), suite.client)
rtest.Resource(pbcatalog.WorkloadType, "test-workload").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
service := rtest.Resource(pbcatalog.ServiceType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{""},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
endpoints := rtest.Resource(pbcatalog.ServiceEndpointsType, "test").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.ServiceEndpoints{}).
WithOwner(service.Id).
Write(suite.T(), suite.client)
rtest.Resource(pbcatalog.WorkloadType, "test-workload").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
err := suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: endpoints.Id})
require.NoError(suite.T(), err)
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged)
res := suite.client.RequireResourceMeta(suite.T(), endpoints.Id, endpointsMetaManagedBy, StatusKey)
suite.client.RequireStatusCondition(suite.T(), service.Id, StatusKey, ConditionManaged)
res := suite.client.RequireResourceMeta(suite.T(), endpoints.Id, endpointsMetaManagedBy, StatusKey)
var newEndpoints pbcatalog.ServiceEndpoints
err = res.Data.UnmarshalTo(&newEndpoints)
require.NoError(suite.T(), err)
require.Len(suite.T(), newEndpoints.Endpoints, 1)
var newEndpoints pbcatalog.ServiceEndpoints
err = res.Data.UnmarshalTo(&newEndpoints)
require.NoError(suite.T(), err)
require.Len(suite.T(), newEndpoints.Endpoints, 1)
})
}
func (suite *controllerSuite) TestController() {
@ -673,184 +700,202 @@ func (suite *controllerSuite) TestController() {
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
// Add a service - there are no workloads so an empty endpoints
// object should be created.
service := rtest.Resource(pbcatalog.ServiceType, "api").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
// Add a service - there are no workloads so an empty endpoints
// object should be created.
service := rtest.Resource(pbcatalog.ServiceType, "api").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
// Wait for the controller to record that the endpoints are being managed
res := suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
// Check that the services status was updated accordingly
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionManaged)
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionIdentitiesNotFound)
// Check that the endpoints resource exists and contains 0 endpoints
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "api").WithTenancy(tenancy).ID()
endpoints := suite.client.RequireResourceExists(suite.T(), endpointsID)
suite.requireEndpoints(endpoints)
// Now add a workload that would be selected by the service. Leave
// the workload in a state where its health has not been reconciled
workload := rtest.Resource(pbcatalog.WorkloadType, "api-1").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
"grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
Identity: "api",
}).
Write(suite.T(), suite.client)
suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey,
ConditionIdentitiesFound([]string{"api"}))
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
// Verify that the generated endpoints now contain the workload
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
}).
Write(suite.T(), suite.client)
// Wait for the controller to record that the endpoints are being managed
res := suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
// Check that the services status was updated accordingly
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionManaged)
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionIdentitiesNotFound)
// Check that the endpoints resource exists and contains 0 endpoints
endpointsID := rtest.Resource(pbcatalog.ServiceEndpointsType, "api").ID()
endpoints := suite.client.RequireResourceExists(suite.T(), endpointsID)
suite.requireEndpoints(endpoints)
// Now add a workload that would be selected by the service. Leave
// the workload in a state where its health has not been reconciled
workload := rtest.Resource(pbcatalog.WorkloadType, "api-1").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
"grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
Identity: "api",
}).
Write(suite.T(), suite.client)
suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey,
ConditionIdentitiesFound([]string{"api"}))
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
// Verify that the generated endpoints now contain the workload
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
HealthStatus: pbcatalog.Health_HEALTH_CRITICAL,
Identity: "api",
})
HealthStatus: pbcatalog.Health_HEALTH_CRITICAL,
Identity: "api",
})
// Update the health status of the workload
suite.client.WriteStatus(suite.ctx, &pbresource.WriteStatusRequest{
Id: workload.Id,
Key: workloadhealth.StatusKey,
Status: &pbresource.Status{
ObservedGeneration: workload.Generation,
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
State: pbresource.Condition_STATE_TRUE,
Reason: "HEALTH_PASSING",
// Update the health status of the workload
suite.client.WriteStatus(suite.ctx, &pbresource.WriteStatusRequest{
Id: workload.Id,
Key: workloadhealth.StatusKey,
Status: &pbresource.Status{
ObservedGeneration: workload.Generation,
Conditions: []*pbresource.Condition{
{
Type: workloadhealth.StatusConditionHealthy,
State: pbresource.Condition_STATE_TRUE,
Reason: "HEALTH_PASSING",
},
},
},
},
})
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
})
// ensure the endpoint was put into the passing state
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
Identity: "api",
})
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
// Update workload identity and check that the status on the service is updated
workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
// ensure the endpoint was put into the passing state
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
"grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
Identity: "endpoints-api-identity",
}).
Write(suite.T(), suite.client)
suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, ConditionIdentitiesFound([]string{"endpoints-api-identity"}))
// Verify that the generated endpoints now contain the workload
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
Identity: "endpoints-api-identity",
})
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
Identity: "api",
})
// rewrite the service to add more selection criteria. This should trigger
// reconciliation but shouldn't result in updating the endpoints because
// the actual list of currently selected workloads has not changed
rtest.Resource(pbcatalog.ServiceType, "api").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
Names: []string{"doesnt-matter"},
// Update workload identity and check that the status on the service is updated
workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
"grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
Identity: "endpoints-api-identity",
}).
Write(suite.T(), suite.client)
suite.client.WaitForStatusCondition(suite.T(), service.Id, StatusKey, ConditionIdentitiesFound([]string{"endpoints-api-identity"}))
// Verify that the generated endpoints now contain the workload
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"http"}},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
Identity: "endpoints-api-identity",
})
// Wait for the service status' observed generation to get bumped
service = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
// rewrite the service to add more selection criteria. This should trigger
// reconciliation but shouldn't result in updating the endpoints because
// the actual list of currently selected workloads has not changed
rtest.Resource(pbcatalog.ServiceType, "api").WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
Names: []string{"doesnt-matter"},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
// Verify that the endpoints were not regenerated
suite.client.RequireVersionUnchanged(suite.T(), endpointsID, endpoints.Version)
// Wait for the service status' observed generation to get bumped
service = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
// Update the service.
updatedService := rtest.Resource(pbcatalog.ServiceType, "api").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
{TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
}).
Write(suite.T(), suite.client)
// Verify that the endpoints were not regenerated
suite.client.RequireVersionUnchanged(suite.T(), endpointsID, endpoints.Version)
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false)
// Update the service.
updatedService := rtest.Resource(pbcatalog.ServiceType, "api").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"api-"},
},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
{TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
}).
Write(suite.T(), suite.client)
// Delete the endpoints. The controller should bring these back momentarily
suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID})
// Wait for the endpoints to be regenerated
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false)
// Wait for controller to recreate the endpoints
retry.Run(suite.T(), func(r *retry.R) {
suite.client.RequireResourceExists(r, endpointsID)
})
// Delete the endpoints. The controller should bring these back momentarily
suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID})
// Move the service to having unmanaged endpoints
rtest.Resource(pbcatalog.ServiceType, "api").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
// Wait for controller to recreate the endpoints
retry.Run(suite.T(), func(r *retry.R) {
suite.client.RequireResourceExists(r, endpointsID)
})
res = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionUnmanaged)
// Move the service to having unmanaged endpoints
rtest.Resource(pbcatalog.ServiceType, "api").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
}).
Write(suite.T(), suite.client)
res = suite.client.WaitForReconciliation(suite.T(), service.Id, StatusKey)
rtest.RequireStatusCondition(suite.T(), res, StatusKey, ConditionUnmanaged)
// Verify that the endpoints were deleted
suite.client.RequireResourceNotFound(suite.T(), endpointsID)
// Verify that the endpoints were deleted
suite.client.RequireResourceNotFound(suite.T(), endpointsID)
})
}
func TestController(t *testing.T) {
suite.Run(t, new(controllerSuite))
}
func (suite *controllerSuite) runTestCaseWithTenancies(testFunc func(*pbresource.Tenancy)) {
for _, tenancy := range suite.tenancies {
suite.Run(suite.appendTenancyInfo(tenancy), func() {
testFunc(tenancy)
})
}
}
func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string {
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
}

2
internal/catalog/internal/controllers/endpoints/reconciliation_data.go

@ -34,7 +34,7 @@ type workloadData struct {
// getServiceData will read the service with the given ID and unmarshal the
// Data field. The return value is a struct that contains the retrieved
// resource as well as the unmsashalled form. If the resource doesn't
// resource as well as the unmarshalled form. If the resource doesn't
// exist, nil will be returned. Any other error either with retrieving
// the resource or unmarshalling it will cause the error to be returned
// to the caller

316
internal/catalog/internal/controllers/endpoints/reconciliation_data_test.go

@ -5,6 +5,7 @@ package endpoints
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -17,6 +18,7 @@ import (
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
@ -28,7 +30,7 @@ type reconciliationDataSuite struct {
suite.Suite
ctx context.Context
client pbresource.ResourceServiceClient
client *resourcetest.Client
rt controller.Runtime
apiServiceData *pbcatalog.Service
@ -41,11 +43,16 @@ type reconciliationDataSuite struct {
api123Workload *pbresource.Resource
web1Workload *pbresource.Resource
web2Workload *pbresource.Resource
tenancies []*pbresource.Tenancy
}
func (suite *reconciliationDataSuite) SetupTest() {
suite.ctx = testutil.TestContext(suite.T())
suite.client = svctest.RunResourceService(suite.T(), types.Register)
suite.tenancies = rtest.TestTenancies()
resourceClient := svctest.RunResourceServiceWithTenancies(suite.T(), types.Register)
suite.client = resourcetest.NewClient(resourceClient)
suite.rt = controller.Runtime{
Client: suite.client,
Logger: testutil.Logger(suite.T()),
@ -67,16 +74,174 @@ func (suite *reconciliationDataSuite) SetupTest() {
}
suite.apiServiceSubsetData = proto.Clone(suite.apiServiceData).(*pbcatalog.Service)
suite.apiServiceSubsetData.Workloads.Filter = "(zim in metadata) and (metadata.zim matches `^g.`)"
}
func (suite *reconciliationDataSuite) TestGetServiceData_NotFound() {
// This test's purposes is to ensure that NotFound errors when retrieving
// the service data are ignored properly.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "not-found").WithTenancy(tenancy).ID())
require.NoError(suite.T(), err)
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetServiceData_ReadError() {
// This test's purpose is to ensure that Read errors other than NotFound
// are propagated back to the caller. Specifying a resource ID with an
// unregistered type is the easiest way to force a resource service error.
badType := &pbresource.Type{
Group: "not",
Kind: "found",
GroupVersion: "vfake",
}
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").WithTenancy(tenancy).ID())
require.Error(suite.T(), err)
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetServiceData_UnmarshalError() {
// This test's purpose is to ensure that unmarshlling errors are returned
// to the caller. We are using a resource id that points to an endpoints
// object instead of a service to ensure that the data will be unmarshallable.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "api").WithTenancy(tenancy).ID())
require.Error(suite.T(), err)
var parseErr resource.ErrDataParse
require.ErrorAs(suite.T(), err, &parseErr)
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetServiceData_Ok() {
// This test's purpose is to ensure that the happy path for
// retrieving a service works as expected.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getServiceData(suite.ctx, suite.rt, suite.apiService.Id)
require.NoError(suite.T(), err)
require.NotNil(suite.T(), data)
require.NotNil(suite.T(), data.resource)
prototest.AssertDeepEqual(suite.T(), suite.apiService.Id, data.resource.Id)
require.Len(suite.T(), data.service.Ports, 1)
})
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_NotFound() {
// This test's purposes is to ensure that NotFound errors when retrieving
// the endpoint data are ignored properly.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").WithTenancy(tenancy).ID())
require.NoError(suite.T(), err)
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_ReadError() {
// This test's purpose is to ensure that Read errors other than NotFound
// are propagated back to the caller. Specifying a resource ID with an
// unregistered type is the easiest way to force a resource service error.
badType := &pbresource.Type{
Group: "not",
Kind: "found",
GroupVersion: "vfake",
}
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").WithTenancy(tenancy).ID())
require.Error(suite.T(), err)
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_UnmarshalError() {
// This test's purpose is to ensure that unmarshlling errors are returned
// to the caller. We are using a resource id that points to a service object
// instead of an endpoints object to ensure that the data will be unmarshallable.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "api").WithTenancy(tenancy).ID())
require.Error(suite.T(), err)
var parseErr resource.ErrDataParse
require.ErrorAs(suite.T(), err, &parseErr)
require.Nil(suite.T(), data)
})
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_Ok() {
// This test's purpose is to ensure that the happy path for
// retrieving an endpoints object works as expected.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
data, err := getEndpointsData(suite.ctx, suite.rt, suite.apiEndpoints.Id)
require.NoError(suite.T(), err)
require.NotNil(suite.T(), data)
require.NotNil(suite.T(), data.resource)
prototest.AssertDeepEqual(suite.T(), suite.apiEndpoints.Id, data.resource.Id)
require.Len(suite.T(), data.endpoints.Endpoints, 1)
})
}
func (suite *reconciliationDataSuite) TestGetWorkloadData() {
// This test's purpose is to ensure that gather workloads for
// a service work as expected. The services selector was crafted
// to exercise the deduplication behavior as well as the sorting
// behavior. The assertions in this test will verify that only
// unique workloads are returned and that they are ordered.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
require.NotNil(suite.T(), suite.apiService)
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
resource: suite.apiService,
service: suite.apiServiceData,
})
require.NoError(suite.T(), err)
require.Len(suite.T(), data, 5)
prototest.AssertDeepEqual(suite.T(), suite.api1Workload, data[0].resource)
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[1].resource)
prototest.AssertDeepEqual(suite.T(), suite.api2Workload, data[2].resource)
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[3].resource)
prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource)
})
}
func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() {
// This is like TestGetWorkloadData except it exercises the post-read
// filter on the selector.
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
require.NotNil(suite.T(), suite.apiServiceSubset)
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
resource: suite.apiServiceSubset,
service: suite.apiServiceSubsetData,
})
require.NoError(suite.T(), err)
require.Len(suite.T(), data, 2)
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource)
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource)
})
}
func TestReconciliationData(t *testing.T) {
suite.Run(t, new(reconciliationDataSuite))
}
func (suite *reconciliationDataSuite) setupResourcesWithTenancy(tenancy *pbresource.Tenancy) {
suite.apiService = rtest.Resource(pbcatalog.ServiceType, "api").
WithTenancy(tenancy).
WithData(suite.T(), suite.apiServiceData).
Write(suite.T(), suite.client)
suite.apiServiceSubset = rtest.Resource(pbcatalog.ServiceType, "api-subset").
WithTenancy(tenancy).
WithData(suite.T(), suite.apiServiceSubsetData).
Write(suite.T(), suite.client)
suite.api1Workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").
WithTenancy(tenancy).
WithMeta("zim", "dib").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
@ -90,6 +255,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
suite.api2Workload = rtest.Resource(pbcatalog.WorkloadType, "api-2").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1"},
@ -102,6 +268,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
suite.api123Workload = rtest.Resource(pbcatalog.WorkloadType, "api-123").
WithTenancy(tenancy).
WithMeta("zim", "gir").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
@ -115,6 +282,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
suite.web1Workload = rtest.Resource(pbcatalog.WorkloadType, "web-1").
WithTenancy(tenancy).
WithMeta("zim", "gaz").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
@ -128,6 +296,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
suite.web2Workload = rtest.Resource(pbcatalog.WorkloadType, "web-2").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1"},
@ -140,10 +309,11 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
suite.apiEndpoints = rtest.Resource(pbcatalog.ServiceEndpointsType, "api").
WithTenancy(tenancy).
WithData(suite.T(), &pbcatalog.ServiceEndpoints{
Endpoints: []*pbcatalog.Endpoint{
{
TargetRef: rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(resource.DefaultNamespacedTenancy()).ID(),
TargetRef: rtest.Resource(pbcatalog.WorkloadType, "api-1").WithTenancy(tenancy).ID(),
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: "127.0.0.1",
@ -160,131 +330,27 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)
}
func (suite *reconciliationDataSuite) TestGetServiceData_NotFound() {
// This test's purposes is to ensure that NotFound errors when retrieving
// the service data are ignored properly.
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "not-found").WithTenancy(resource.DefaultNamespacedTenancy()).ID())
require.NoError(suite.T(), err)
require.Nil(suite.T(), data)
func (suite *reconciliationDataSuite) cleanupResources() {
suite.client.MustDelete(suite.T(), suite.apiService.Id)
suite.client.MustDelete(suite.T(), suite.apiServiceSubset.Id)
suite.client.MustDelete(suite.T(), suite.api1Workload.Id)
suite.client.MustDelete(suite.T(), suite.api2Workload.Id)
suite.client.MustDelete(suite.T(), suite.api123Workload.Id)
suite.client.MustDelete(suite.T(), suite.web1Workload.Id)
suite.client.MustDelete(suite.T(), suite.web2Workload.Id)
suite.client.MustDelete(suite.T(), suite.apiEndpoints.Id)
}
func (suite *reconciliationDataSuite) TestGetServiceData_ReadError() {
// This test's purpose is to ensure that Read errors other than NotFound
// are propagated back to the caller. Specifying a resource ID with an
// unregistered type is the easiest way to force a resource service error.
badType := &pbresource.Type{
Group: "not",
Kind: "found",
GroupVersion: "vfake",
func (suite *reconciliationDataSuite) runTestCaseWithTenancies(testFunc func(*pbresource.Tenancy)) {
for _, tenancy := range suite.tenancies {
suite.Run(suite.appendTenancyInfo(tenancy), func() {
suite.setupResourcesWithTenancy(tenancy)
testFunc(tenancy)
suite.T().Cleanup(suite.cleanupResources)
})
}
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").ID())
require.Error(suite.T(), err)
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
require.Nil(suite.T(), data)
}
func (suite *reconciliationDataSuite) TestGetServiceData_UnmarshalError() {
// This test's purpose is to ensure that unmarshlling errors are returned
// to the caller. We are using a resource id that points to an endpoints
// object instead of a service to ensure that the data will be unmarshallable.
data, err := getServiceData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "api").ID())
require.Error(suite.T(), err)
var parseErr resource.ErrDataParse
require.ErrorAs(suite.T(), err, &parseErr)
require.Nil(suite.T(), data)
}
func (suite *reconciliationDataSuite) TestGetServiceData_Ok() {
// This test's purpose is to ensure that the happy path for
// retrieving a service works as expected.
data, err := getServiceData(suite.ctx, suite.rt, suite.apiService.Id)
require.NoError(suite.T(), err)
require.NotNil(suite.T(), data)
require.NotNil(suite.T(), data.resource)
prototest.AssertDeepEqual(suite.T(), suite.apiService.Id, data.resource.Id)
require.Len(suite.T(), data.service.Ports, 1)
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_NotFound() {
// This test's purposes is to ensure that NotFound errors when retrieving
// the endpoint data are ignored properly.
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceEndpointsType, "not-found").ID())
require.NoError(suite.T(), err)
require.Nil(suite.T(), data)
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_ReadError() {
// This test's purpose is to ensure that Read errors other than NotFound
// are propagated back to the caller. Specifying a resource ID with an
// unregistered type is the easiest way to force a resource service error.
badType := &pbresource.Type{
Group: "not",
Kind: "found",
GroupVersion: "vfake",
}
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(badType, "foo").ID())
require.Error(suite.T(), err)
require.Equal(suite.T(), codes.InvalidArgument, status.Code(err))
require.Nil(suite.T(), data)
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_UnmarshalError() {
// This test's purpose is to ensure that unmarshlling errors are returned
// to the caller. We are using a resource id that points to a service object
// instead of an endpoints object to ensure that the data will be unmarshallable.
data, err := getEndpointsData(suite.ctx, suite.rt, rtest.Resource(pbcatalog.ServiceType, "api").ID())
require.Error(suite.T(), err)
var parseErr resource.ErrDataParse
require.ErrorAs(suite.T(), err, &parseErr)
require.Nil(suite.T(), data)
}
func (suite *reconciliationDataSuite) TestGetEndpointsData_Ok() {
// This test's purpose is to ensure that the happy path for
// retrieving an endpoints object works as expected.
data, err := getEndpointsData(suite.ctx, suite.rt, suite.apiEndpoints.Id)
require.NoError(suite.T(), err)
require.NotNil(suite.T(), data)
require.NotNil(suite.T(), data.resource)
prototest.AssertDeepEqual(suite.T(), suite.apiEndpoints.Id, data.resource.Id)
require.Len(suite.T(), data.endpoints.Endpoints, 1)
}
func (suite *reconciliationDataSuite) TestGetWorkloadData() {
// This test's purpose is to ensure that gather workloads for
// a service work as expected. The services selector was crafted
// to exercise the deduplication behavior as well as the sorting
// behavior. The assertions in this test will verify that only
// unique workloads are returned and that they are ordered.
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
resource: suite.apiService,
service: suite.apiServiceData,
})
require.NoError(suite.T(), err)
require.Len(suite.T(), data, 5)
prototest.AssertDeepEqual(suite.T(), suite.api1Workload, data[0].resource)
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[1].resource)
prototest.AssertDeepEqual(suite.T(), suite.api2Workload, data[2].resource)
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[3].resource)
prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource)
}
func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() {
// This is like TestGetWorkloadData except it exercises the post-read
// filter on the selector.
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
resource: suite.apiServiceSubset,
service: suite.apiServiceSubsetData,
})
require.NoError(suite.T(), err)
require.Len(suite.T(), data, 2)
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource)
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource)
}
func TestReconciliationData(t *testing.T) {
suite.Run(t, new(reconciliationDataSuite))
func (suite *reconciliationDataSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string {
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
}

Loading…
Cancel
Save