NET-6776 - Update Routes controller to use ComputedFailoverPolicy CE (#20496)

Update Routes controller to use ComputedFailoverPolicy
pull/20345/head^2
Tauhid Anjum 2024-02-06 13:28:18 +05:30 committed by GitHub
parent 0c509a60a4
commit 88b8a1cc36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 523 additions and 210 deletions

View File

@ -32,7 +32,7 @@ flowchart TD
mesh/v2beta1/computedexplicitdestinations --> mesh/v2beta1/destinations
mesh/v2beta1/computedproxyconfiguration --> catalog/v2beta1/workload
mesh/v2beta1/computedproxyconfiguration --> mesh/v2beta1/proxyconfiguration
mesh/v2beta1/computedroutes --> catalog/v2beta1/failoverpolicy
mesh/v2beta1/computedroutes --> catalog/v2beta1/computedfailoverpolicy
mesh/v2beta1/computedroutes --> catalog/v2beta1/service
mesh/v2beta1/computedroutes --> mesh/v2beta1/destinationpolicy
mesh/v2beta1/computedroutes --> mesh/v2beta1/grpcroute

View File

@ -22,11 +22,11 @@ import (
)
const (
failoverDestRefsIndexName = "destination-refs"
computedFailoverDestRefsIndexName = "destination-refs"
)
func resolveFailoverDestRefs(_ context.Context, rt controller.Runtime, id *pbresource.ID) ([]*pbresource.ID, error) {
iter, err := rt.Cache.ListIterator(pbcatalog.FailoverPolicyType, failoverDestRefsIndexName, id)
func resolveComputedFailoverDestRefs(_ context.Context, rt controller.Runtime, id *pbresource.ID) ([]*pbresource.ID, error) {
iter, err := rt.Cache.ListIterator(pbcatalog.ComputedFailoverPolicyType, computedFailoverDestRefsIndexName, id)
if err != nil {
return nil, err
}
@ -40,11 +40,11 @@ func resolveFailoverDestRefs(_ context.Context, rt controller.Runtime, id *pbres
}
func Controller() *controller.Controller {
failoverDestRefsIndex := indexers.RefOrIDIndex(failoverDestRefsIndexName, func(dec *resource.DecodedResource[*pbcatalog.FailoverPolicy]) []*pbresource.Reference {
computedFailoverDestRefsIndex := indexers.RefOrIDIndex(computedFailoverDestRefsIndexName, func(dec *resource.DecodedResource[*pbcatalog.ComputedFailoverPolicy]) []*pbresource.Reference {
return dec.Data.GetUnderlyingDestinationRefs()
})
mapper := xroutemapper.New(resolveFailoverDestRefs)
mapper := xroutemapper.New(resolveComputedFailoverDestRefs)
r := &routesReconciler{
mapper: mapper,
@ -54,7 +54,7 @@ func Controller() *controller.Controller {
WithWatch(pbmesh.GRPCRouteType, mapper.MapGRPCRoute).
WithWatch(pbmesh.TCPRouteType, mapper.MapTCPRoute).
WithWatch(pbmesh.DestinationPolicyType, mapper.MapServiceNameAligned).
WithWatch(pbcatalog.FailoverPolicyType, mapper.MapServiceNameAligned, failoverDestRefsIndex).
WithWatch(pbcatalog.ComputedFailoverPolicyType, mapper.MapServiceNameAligned, computedFailoverDestRefsIndex).
WithWatch(pbcatalog.ServiceType, mapper.MapService).
WithReconciler(r)
}

View File

@ -1347,6 +1347,270 @@ func (suite *controllerSuite) TestController() {
})
}
func (suite *controllerSuite) TestController_Failover() {
for _, tenancy := range suite.tenancies {
computedRoutesID := rtest.Resource(pbmesh.ComputedRoutesType, "api").
WithTenancy(tenancy).
ID()
apiServiceRef := resource.Reference(rtest.Resource(pbcatalog.ServiceType, "api").WithTenancy(tenancy).ID(), "")
apiServiceData := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
{TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH}},
}
_ = rtest.Resource(pbcatalog.ServiceType, "api").
WithTenancy(tenancy).
WithData(suite.T(), apiServiceData).
Write(suite.T(), suite.client)
var lastVersion string
testutil.RunStep(suite.T(), "default http route without failover policy", func(t *testing.T) {
// Check that the computed routes resource exists and it has one port that is the default.
expect := &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
apiServiceRef,
},
PortedConfigs: map[string]*pbmesh.ComputedPortRoutes{
"http": {
UsingDefaultConfig: true,
Config: &pbmesh.ComputedPortRoutes_Http{
Http: &pbmesh.ComputedHTTPRoute{
Rules: []*pbmesh.ComputedHTTPRouteRule{{
Matches: []*pbmesh.HTTPRouteMatch{{
Path: &pbmesh.HTTPPathMatch{
Type: pbmesh.PathMatchType_PATH_MATCH_TYPE_PREFIX,
Value: "/",
},
}},
BackendRefs: []*pbmesh.ComputedHTTPBackendRef{{
BackendTarget: backendName("api", "http", apiServiceRef.Tenancy),
}},
}},
},
},
ParentRef: newParentRef(apiServiceRef, "http"),
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
Targets: map[string]*pbmesh.BackendTargetDetails{
backendName("api", "http", apiServiceRef.Tenancy): {
Type: pbmesh.BackendTargetDetailsType_BACKEND_TARGET_DETAILS_TYPE_DIRECT,
MeshPort: "mesh",
BackendRef: newBackendRef(apiServiceRef, "http", ""),
DestinationConfig: defaultDestConfig(),
},
},
},
},
}
lastVersion = requireNewComputedRoutesVersion(t, suite.client, computedRoutesID, lastVersion, expect)
})
failoverPolicyData := &pbcatalog.ComputedFailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{
{
Ref: apiServiceRef,
Port: "http",
},
},
},
},
BoundReferences: []*pbresource.Reference{apiServiceRef},
}
_ = rtest.Resource(pbcatalog.ComputedFailoverPolicyType, "api").
WithTenancy(tenancy).
WithData(suite.T(), failoverPolicyData).
Write(suite.T(), suite.client)
testutil.RunStep(suite.T(), "with failover policy", func(t *testing.T) {
// Check that the computed routes resource exists and it has one port that is the default.
expect := &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
newRef(pbcatalog.ComputedFailoverPolicyType, "api", tenancy),
apiServiceRef,
},
PortedConfigs: map[string]*pbmesh.ComputedPortRoutes{
"http": {
UsingDefaultConfig: true,
Config: &pbmesh.ComputedPortRoutes_Http{
Http: &pbmesh.ComputedHTTPRoute{
Rules: []*pbmesh.ComputedHTTPRouteRule{{
Matches: []*pbmesh.HTTPRouteMatch{{
Path: &pbmesh.HTTPPathMatch{
Type: pbmesh.PathMatchType_PATH_MATCH_TYPE_PREFIX,
Value: "/",
},
}},
BackendRefs: []*pbmesh.ComputedHTTPBackendRef{{
BackendTarget: backendName("api", "http", apiServiceRef.Tenancy),
}},
}},
},
},
ParentRef: newParentRef(apiServiceRef, "http"),
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
Targets: map[string]*pbmesh.BackendTargetDetails{
backendName("api", "http", apiServiceRef.Tenancy): {
Type: pbmesh.BackendTargetDetailsType_BACKEND_TARGET_DETAILS_TYPE_DIRECT,
MeshPort: "mesh",
BackendRef: newBackendRef(apiServiceRef, "http", ""),
DestinationConfig: defaultDestConfig(),
FailoverConfig: &pbmesh.ComputedFailoverConfig{
Destinations: []*pbmesh.ComputedFailoverDestination{
{BackendTarget: backendName("api", "http", tenancy)},
},
},
},
},
},
},
}
lastVersion = requireNewComputedRoutesVersion(t, suite.client, computedRoutesID, lastVersion, expect)
})
otherServiceRef := resource.Reference(rtest.Resource(pbcatalog.ServiceType, "other").WithTenancy(tenancy).ID(), "")
otherServiceData := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"other-"}},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
{TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
},
}
otherService := rtest.Resource(pbcatalog.ServiceType, "other").
WithData(suite.T(), otherServiceData).
WithTenancy(tenancy).
Write(suite.T(), suite.client)
failoverPolicyData = &pbcatalog.ComputedFailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{
{
Ref: otherServiceRef,
Port: "http",
},
},
},
},
BoundReferences: []*pbresource.Reference{otherServiceRef},
}
_ = rtest.Resource(pbcatalog.ComputedFailoverPolicyType, "api").
WithTenancy(tenancy).
WithData(suite.T(), failoverPolicyData).
Write(suite.T(), suite.client)
testutil.RunStep(suite.T(), "with other service and updated failover policy", func(t *testing.T) {
// Check that the computed routes resource exists and it has one port that is the default.
expect := &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
newRef(pbcatalog.ComputedFailoverPolicyType, "api", tenancy),
apiServiceRef,
otherServiceRef,
},
PortedConfigs: map[string]*pbmesh.ComputedPortRoutes{
"http": {
UsingDefaultConfig: true,
Config: &pbmesh.ComputedPortRoutes_Http{
Http: &pbmesh.ComputedHTTPRoute{
Rules: []*pbmesh.ComputedHTTPRouteRule{{
Matches: []*pbmesh.HTTPRouteMatch{{
Path: &pbmesh.HTTPPathMatch{
Type: pbmesh.PathMatchType_PATH_MATCH_TYPE_PREFIX,
Value: "/",
},
}},
BackendRefs: []*pbmesh.ComputedHTTPBackendRef{{
BackendTarget: backendName("api", "http", apiServiceRef.Tenancy),
}},
}},
},
},
ParentRef: newParentRef(apiServiceRef, "http"),
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
Targets: map[string]*pbmesh.BackendTargetDetails{
backendName("api", "http", apiServiceRef.Tenancy): {
Type: pbmesh.BackendTargetDetailsType_BACKEND_TARGET_DETAILS_TYPE_DIRECT,
MeshPort: "mesh",
BackendRef: newBackendRef(apiServiceRef, "http", ""),
DestinationConfig: defaultDestConfig(),
FailoverConfig: &pbmesh.ComputedFailoverConfig{
Destinations: []*pbmesh.ComputedFailoverDestination{
{BackendTarget: backendName("other", "http", tenancy)},
},
},
},
backendName("other", "http", otherServiceRef.Tenancy): {
Type: pbmesh.BackendTargetDetailsType_BACKEND_TARGET_DETAILS_TYPE_INDIRECT,
MeshPort: "mesh",
BackendRef: newBackendRef(otherServiceRef, "http", ""),
DestinationConfig: defaultDestConfig(),
},
},
},
},
}
lastVersion = requireNewComputedRoutesVersion(t, suite.client, computedRoutesID, lastVersion, expect)
})
suite.T().Log("delete other service")
suite.client.MustDelete(suite.T(), otherService.Id)
testutil.RunStep(suite.T(), "api computed routes is recoinciled", func(t *testing.T) {
// Check that the computed routes resource exists and it has one port that is the default.
expect := &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
newRef(pbcatalog.ComputedFailoverPolicyType, "api", tenancy),
apiServiceRef,
},
PortedConfigs: map[string]*pbmesh.ComputedPortRoutes{
"http": {
UsingDefaultConfig: true,
Config: &pbmesh.ComputedPortRoutes_Http{
Http: &pbmesh.ComputedHTTPRoute{
Rules: []*pbmesh.ComputedHTTPRouteRule{{
Matches: []*pbmesh.HTTPRouteMatch{{
Path: &pbmesh.HTTPPathMatch{
Type: pbmesh.PathMatchType_PATH_MATCH_TYPE_PREFIX,
Value: "/",
},
}},
BackendRefs: []*pbmesh.ComputedHTTPBackendRef{{
BackendTarget: backendName("api", "http", apiServiceRef.Tenancy),
}},
}},
},
},
ParentRef: newParentRef(apiServiceRef, "http"),
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
Targets: map[string]*pbmesh.BackendTargetDetails{
backendName("api", "http", apiServiceRef.Tenancy): {
Type: pbmesh.BackendTargetDetailsType_BACKEND_TARGET_DETAILS_TYPE_DIRECT,
MeshPort: "mesh",
BackendRef: newBackendRef(apiServiceRef, "http", ""),
DestinationConfig: defaultDestConfig(),
FailoverConfig: &pbmesh.ComputedFailoverConfig{},
},
},
},
},
}
lastVersion = requireNewComputedRoutesVersion(t, suite.client, computedRoutesID, lastVersion, expect)
})
}
}
func newParentRef(ref *pbresource.Reference, port string) *pbmesh.ParentReference {
return &pbmesh.ParentReference{
Ref: ref,

View File

@ -10,7 +10,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/routes/loader"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
@ -314,10 +313,9 @@ func compile(
}
boundRefCollector.AddRefOrID(svcRef)
failoverPolicy := related.GetFailoverPolicyForService(svcRef)
failoverPolicy := related.GetComputedFailoverPolicyForService(svcRef)
if failoverPolicy != nil {
simpleFailoverPolicy := catalog.SimplifyFailoverPolicy(svc.Data, failoverPolicy.Data)
portFailoverConfig, ok := simpleFailoverPolicy.PortConfigs[details.BackendRef.Port]
portFailoverConfig, ok := failoverPolicy.Data.PortConfigs[details.BackendRef.Port]
if ok {
boundRefCollector.AddRefOrID(failoverPolicy.Resource.Id)

View File

@ -96,12 +96,12 @@ func TestGenerateComputedRoutes(t *testing.T) {
return rtest.MustDecode[*pbmesh.DestinationPolicy](t, policy)
}
newFailPolicy := func(name string, data *pbcatalog.FailoverPolicy) *types.DecodedFailoverPolicy {
policy := rtest.Resource(pbcatalog.FailoverPolicyType, name).
newFailPolicy := func(name string, data *pbcatalog.ComputedFailoverPolicy) *types.DecodedComputedFailoverPolicy {
policy := rtest.Resource(pbcatalog.ComputedFailoverPolicyType, name).
WithTenancy(tenancy).
WithData(t, data).Build()
rtest.ValidateAndNormalize(t, registry, policy)
return rtest.MustDecode[*pbcatalog.FailoverPolicy](t, policy)
return rtest.MustDecode[*pbcatalog.ComputedFailoverPolicy](t, policy)
}
backendName := func(name, port string) string {
@ -1606,11 +1606,18 @@ func TestGenerateComputedRoutes(t *testing.T) {
failoverPolicy := &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: []*pbcatalog.FailoverDestination{
{Ref: barServiceRef},
{Ref: barServiceRef}, // port is not supported in non-ported config
{Ref: deadServiceRef}, // no service
},
},
}
simplifiedFailoverPolicy := catalog.SimplifyFailoverPolicy(fooServiceData, failoverPolicy)
computedFailoverPolicy := &pbcatalog.ComputedFailoverPolicy{
PortConfigs: simplifiedFailoverPolicy.PortConfigs,
BoundReferences: []*pbresource.Reference{barServiceRef},
}
portFailoverConfig := &pbmesh.ComputedFailoverConfig{
Destinations: []*pbmesh.ComputedFailoverDestination{
{BackendTarget: backendName("bar", "http")},
@ -1625,7 +1632,7 @@ func TestGenerateComputedRoutes(t *testing.T) {
newService("foo", fooServiceData),
newService("bar", barServiceData),
newHTTPRoute("api-http-route1", httpRoute1),
newFailPolicy("foo", failoverPolicy),
newFailPolicy("foo", computedFailoverPolicy),
)
expect := []*ComputedRoutesResult{{
@ -1633,7 +1640,7 @@ func TestGenerateComputedRoutes(t *testing.T) {
OwnerID: apiServiceID,
Data: &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
newRef(pbcatalog.FailoverPolicyType, "foo", tenancy),
newRef(pbcatalog.ComputedFailoverPolicyType, "foo", tenancy),
apiServiceRef,
barServiceRef,
fooServiceRef,
@ -1860,9 +1867,17 @@ func TestGenerateComputedRoutes(t *testing.T) {
},
},
}
simplifiedFailoverPolicy := catalog.SimplifyFailoverPolicy(fooServiceData, failoverPolicy)
computedFailoverPolicy := &pbcatalog.ComputedFailoverPolicy{
PortConfigs: simplifiedFailoverPolicy.PortConfigs,
BoundReferences: []*pbresource.Reference{barServiceRef},
}
// Test ported config if used in test case
if configKeyPortFoo != "" {
failoverPolicy = &pbcatalog.FailoverPolicy{
failoverPolicy := &pbcatalog.FailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
configKeyPortFoo: {
Destinations: []*pbcatalog.FailoverDestination{
@ -1872,6 +1887,13 @@ func TestGenerateComputedRoutes(t *testing.T) {
},
},
}
simplifiedFailoverPolicy = catalog.SimplifyFailoverPolicy(fooServiceData, failoverPolicy)
computedFailoverPolicy = &pbcatalog.ComputedFailoverPolicy{
PortConfigs: simplifiedFailoverPolicy.PortConfigs,
BoundReferences: []*pbresource.Reference{barServiceRef},
}
}
expectedPortFailoverConfig := &pbmesh.ComputedFailoverConfig{
Destinations: []*pbmesh.ComputedFailoverDestination{
@ -1887,7 +1909,7 @@ func TestGenerateComputedRoutes(t *testing.T) {
newService("foo", fooServiceData),
newService("bar", barServiceData),
newHTTPRoute("api-http-route1", httpRoute1),
newFailPolicy("foo", failoverPolicy),
newFailPolicy("foo", computedFailoverPolicy),
)
// Same result as non-virtual-port variant of test
@ -1896,7 +1918,7 @@ func TestGenerateComputedRoutes(t *testing.T) {
OwnerID: apiServiceID,
Data: &pbmesh.ComputedRoutes{
BoundReferences: []*pbresource.Reference{
newRef(pbcatalog.FailoverPolicyType, "foo", tenancy),
newRef(pbcatalog.ComputedFailoverPolicyType, "foo", tenancy),
apiServiceRef,
barServiceRef,
fooServiceRef,

View File

@ -217,14 +217,14 @@ func (l *loader) loadUpstreamService(
if service != nil {
l.out.AddService(service)
failoverPolicyID := changeResourceType(svcID, pbcatalog.FailoverPolicyType)
failoverPolicy, err := l.mem.GetFailoverPolicy(ctx, failoverPolicyID)
failoverPolicyID := changeResourceType(svcID, pbcatalog.ComputedFailoverPolicyType)
failoverPolicy, err := l.mem.GetComputedFailoverPolicy(ctx, failoverPolicyID)
if err != nil {
logger.Error("error retrieving the failover policy", "failoverPolicyID", failoverPolicyID, "error", err)
logger.Error("error retrieving the computed failover policy", "computedFailoverPolicyID", failoverPolicyID, "error", err)
return err
}
if failoverPolicy != nil {
l.out.AddFailoverPolicy(failoverPolicy)
l.out.AddComputedFailoverPolicy(failoverPolicy)
destRefs := failoverPolicy.Data.GetUnderlyingDestinationRefs()
for _, destRef := range destRefs {

View File

@ -33,7 +33,7 @@ func TestLoadResourcesForComputedRoutes(t *testing.T) {
// temporarily creating the cache here until we can get rid of the xroutemapper object entirely. Its not super clean to hack together a cache for usage in this func
// but its better than alternatives and this should be relatively short lived.
testCache := cache.New()
testCache.AddIndex(pbcatalog.FailoverPolicyType, indexers.RefOrIDIndex("dest-refs", func(res *resource.DecodedResource[*pbcatalog.FailoverPolicy]) []*pbresource.Reference {
testCache.AddIndex(pbcatalog.ComputedFailoverPolicyType, indexers.RefOrIDIndex("dest-refs", func(res *resource.DecodedResource[*pbcatalog.ComputedFailoverPolicy]) []*pbresource.Reference {
return res.Data.GetUnderlyingDestinationRefs()
}))
@ -53,7 +53,7 @@ func TestLoadResourcesForComputedRoutes(t *testing.T) {
}
mapper := xroutemapper.New(func(_ context.Context, rt controller.Runtime, id *pbresource.ID) ([]*pbresource.ID, error) {
iter, err := rt.Cache.ListIterator(pbcatalog.FailoverPolicyType, "dest-refs", id)
iter, err := rt.Cache.ListIterator(pbcatalog.ComputedFailoverPolicyType, "dest-refs", id)
if err != nil {
return nil, err
}
@ -121,12 +121,12 @@ func TestLoadResourcesForComputedRoutes(t *testing.T) {
return dec
}
writeFailover := func(name string, data *pbcatalog.FailoverPolicy) *types.DecodedFailoverPolicy {
res := rtest.Resource(pbcatalog.FailoverPolicyType, name).
writeFailover := func(name string, data *pbcatalog.ComputedFailoverPolicy) *types.DecodedComputedFailoverPolicy {
res := rtest.Resource(pbcatalog.ComputedFailoverPolicyType, name).
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(t, data).
Write(t, client)
dec, err := resource.Decode[*pbcatalog.FailoverPolicy](res)
dec, err := resource.Decode[*pbcatalog.ComputedFailoverPolicy](res)
require.NoError(t, err)
return dec
}
@ -346,12 +346,18 @@ func TestLoadResourcesForComputedRoutes(t *testing.T) {
), out.RoutesByParentRef)
})
barFailover := writeFailover("bar", &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: []*pbcatalog.FailoverDestination{{
Ref: newRef(pbcatalog.ServiceType, "admin"),
}},
barFailover := writeFailover("bar", &pbcatalog.ComputedFailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{
{
Ref: newRef(pbcatalog.ServiceType, "admin"),
Port: "http",
},
},
},
},
BoundReferences: []*pbresource.Reference{newRef(pbcatalog.ServiceType, "admin")},
})
testutil.RunStep(t, "add a failover", func(t *testing.T) {

View File

@ -19,12 +19,12 @@ import (
type memoizingLoader struct {
client pbresource.ResourceServiceClient
mapHTTPRoute map[resource.ReferenceKey]*types.DecodedHTTPRoute
mapGRPCRoute map[resource.ReferenceKey]*types.DecodedGRPCRoute
mapTCPRoute map[resource.ReferenceKey]*types.DecodedTCPRoute
mapDestinationPolicy map[resource.ReferenceKey]*types.DecodedDestinationPolicy
mapFailoverPolicy map[resource.ReferenceKey]*types.DecodedFailoverPolicy
mapService map[resource.ReferenceKey]*types.DecodedService
mapHTTPRoute map[resource.ReferenceKey]*types.DecodedHTTPRoute
mapGRPCRoute map[resource.ReferenceKey]*types.DecodedGRPCRoute
mapTCPRoute map[resource.ReferenceKey]*types.DecodedTCPRoute
mapDestinationPolicy map[resource.ReferenceKey]*types.DecodedDestinationPolicy
mapComputedFailoverPolicy map[resource.ReferenceKey]*types.DecodedComputedFailoverPolicy
mapService map[resource.ReferenceKey]*types.DecodedService
}
func newMemoizingLoader(client pbresource.ResourceServiceClient) *memoizingLoader {
@ -32,13 +32,13 @@ func newMemoizingLoader(client pbresource.ResourceServiceClient) *memoizingLoade
panic("client is required")
}
return &memoizingLoader{
client: client,
mapHTTPRoute: make(map[resource.ReferenceKey]*types.DecodedHTTPRoute),
mapGRPCRoute: make(map[resource.ReferenceKey]*types.DecodedGRPCRoute),
mapTCPRoute: make(map[resource.ReferenceKey]*types.DecodedTCPRoute),
mapDestinationPolicy: make(map[resource.ReferenceKey]*types.DecodedDestinationPolicy),
mapFailoverPolicy: make(map[resource.ReferenceKey]*types.DecodedFailoverPolicy),
mapService: make(map[resource.ReferenceKey]*types.DecodedService),
client: client,
mapHTTPRoute: make(map[resource.ReferenceKey]*types.DecodedHTTPRoute),
mapGRPCRoute: make(map[resource.ReferenceKey]*types.DecodedGRPCRoute),
mapTCPRoute: make(map[resource.ReferenceKey]*types.DecodedTCPRoute),
mapDestinationPolicy: make(map[resource.ReferenceKey]*types.DecodedDestinationPolicy),
mapComputedFailoverPolicy: make(map[resource.ReferenceKey]*types.DecodedComputedFailoverPolicy),
mapService: make(map[resource.ReferenceKey]*types.DecodedService),
}
}
@ -58,8 +58,8 @@ func (m *memoizingLoader) GetDestinationPolicy(ctx context.Context, id *pbresour
return getOrCacheResource[*pbmesh.DestinationPolicy](ctx, m.client, m.mapDestinationPolicy, pbmesh.DestinationPolicyType, id)
}
func (m *memoizingLoader) GetFailoverPolicy(ctx context.Context, id *pbresource.ID) (*types.DecodedFailoverPolicy, error) {
return getOrCacheResource[*pbcatalog.FailoverPolicy](ctx, m.client, m.mapFailoverPolicy, pbcatalog.FailoverPolicyType, id)
func (m *memoizingLoader) GetComputedFailoverPolicy(ctx context.Context, id *pbresource.ID) (*types.DecodedComputedFailoverPolicy, error) {
return getOrCacheResource[*pbcatalog.ComputedFailoverPolicy](ctx, m.client, m.mapComputedFailoverPolicy, pbcatalog.ComputedFailoverPolicyType, id)
}
func (m *memoizingLoader) GetService(ctx context.Context, id *pbresource.ID) (*types.DecodedService, error) {

View File

@ -17,24 +17,24 @@ import (
type RelatedResources struct {
ComputedRoutesList []*pbresource.ID
// RoutesByParentRef is a map of a parent Service to the xRoutes that compose it.
RoutesByParentRef map[resource.ReferenceKey]map[resource.ReferenceKey]struct{}
HTTPRoutes map[resource.ReferenceKey]*types.DecodedHTTPRoute
GRPCRoutes map[resource.ReferenceKey]*types.DecodedGRPCRoute
TCPRoutes map[resource.ReferenceKey]*types.DecodedTCPRoute
Services map[resource.ReferenceKey]*types.DecodedService
FailoverPolicies map[resource.ReferenceKey]*types.DecodedFailoverPolicy
DestinationPolicies map[resource.ReferenceKey]*types.DecodedDestinationPolicy
RoutesByParentRef map[resource.ReferenceKey]map[resource.ReferenceKey]struct{}
HTTPRoutes map[resource.ReferenceKey]*types.DecodedHTTPRoute
GRPCRoutes map[resource.ReferenceKey]*types.DecodedGRPCRoute
TCPRoutes map[resource.ReferenceKey]*types.DecodedTCPRoute
Services map[resource.ReferenceKey]*types.DecodedService
ComputedFailoverPolicies map[resource.ReferenceKey]*types.DecodedComputedFailoverPolicy
DestinationPolicies map[resource.ReferenceKey]*types.DecodedDestinationPolicy
}
func NewRelatedResources() *RelatedResources {
return &RelatedResources{
RoutesByParentRef: make(map[resource.ReferenceKey]map[resource.ReferenceKey]struct{}),
HTTPRoutes: make(map[resource.ReferenceKey]*types.DecodedHTTPRoute),
GRPCRoutes: make(map[resource.ReferenceKey]*types.DecodedGRPCRoute),
TCPRoutes: make(map[resource.ReferenceKey]*types.DecodedTCPRoute),
Services: make(map[resource.ReferenceKey]*types.DecodedService),
FailoverPolicies: make(map[resource.ReferenceKey]*types.DecodedFailoverPolicy),
DestinationPolicies: make(map[resource.ReferenceKey]*types.DecodedDestinationPolicy),
RoutesByParentRef: make(map[resource.ReferenceKey]map[resource.ReferenceKey]struct{}),
HTTPRoutes: make(map[resource.ReferenceKey]*types.DecodedHTTPRoute),
GRPCRoutes: make(map[resource.ReferenceKey]*types.DecodedGRPCRoute),
TCPRoutes: make(map[resource.ReferenceKey]*types.DecodedTCPRoute),
Services: make(map[resource.ReferenceKey]*types.DecodedService),
ComputedFailoverPolicies: make(map[resource.ReferenceKey]*types.DecodedComputedFailoverPolicy),
DestinationPolicies: make(map[resource.ReferenceKey]*types.DecodedDestinationPolicy),
}
}
@ -85,8 +85,8 @@ func (r *RelatedResources) AddResource(res any) {
r.AddDestinationPolicy(dec)
case *types.DecodedService:
r.AddService(dec)
case *types.DecodedFailoverPolicy:
r.AddFailoverPolicy(dec)
case *types.DecodedComputedFailoverPolicy:
r.AddComputedFailoverPolicy(dec)
default:
panic(fmt.Sprintf("unknown decoded resource type: %T", res))
}
@ -115,8 +115,8 @@ func (r *RelatedResources) AddService(dec *types.DecodedService) {
addResource(dec.Resource.Id, dec, r.Services)
}
func (r *RelatedResources) AddFailoverPolicy(dec *types.DecodedFailoverPolicy) {
addResource(dec.Resource.Id, dec, r.FailoverPolicies)
func (r *RelatedResources) AddComputedFailoverPolicy(dec *types.DecodedComputedFailoverPolicy) {
addResource(dec.Resource.Id, dec, r.ComputedFailoverPolicies)
}
func (r *RelatedResources) addRouteSetEntries(
@ -195,17 +195,17 @@ func (r *RelatedResources) GetService(ref resource.ReferenceOrID) *types.Decoded
return r.Services[resource.NewReferenceKey(ref)]
}
func (r *RelatedResources) GetFailoverPolicy(ref resource.ReferenceOrID) *types.DecodedFailoverPolicy {
return r.FailoverPolicies[resource.NewReferenceKey(ref)]
func (r *RelatedResources) GetComputedFailoverPolicy(ref resource.ReferenceOrID) *types.DecodedComputedFailoverPolicy {
return r.ComputedFailoverPolicies[resource.NewReferenceKey(ref)]
}
func (r *RelatedResources) GetFailoverPolicyForService(ref resource.ReferenceOrID) *types.DecodedFailoverPolicy {
func (r *RelatedResources) GetComputedFailoverPolicyForService(ref resource.ReferenceOrID) *types.DecodedComputedFailoverPolicy {
failRef := &pbresource.Reference{
Type: pbcatalog.FailoverPolicyType,
Type: pbcatalog.ComputedFailoverPolicyType,
Tenancy: ref.GetTenancy(),
Name: ref.GetName(),
}
return r.GetFailoverPolicy(failRef)
return r.GetComputedFailoverPolicy(failRef)
}
func (r *RelatedResources) GetDestinationPolicy(ref resource.ReferenceOrID) *types.DecodedDestinationPolicy {

View File

@ -16,13 +16,13 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
type ResolveFailoverServiceDestinations func(context.Context, controller.Runtime, *pbresource.ID) ([]*pbresource.ID, error)
type ResolveComputedFailoverServiceDestinations func(context.Context, controller.Runtime, *pbresource.ID) ([]*pbresource.ID, error)
// Mapper tracks the following relationships:
//
// - xRoute <-> ParentRef Service
// - xRoute <-> BackendRef Service
// - FailoverPolicy <-> DestRef Service
// - xRoute <-> ParentRef Service
// - xRoute <-> BackendRef Service
// - ComputedFailoverPolicy <-> DestRef Service
//
// It is the job of the controller, loader, and mapper to keep the mappings up
// to date whenever new data is loaded. Notably because the dep mapper events
@ -40,11 +40,11 @@ type Mapper struct {
grpcRouteBackendMapper *bimapper.Mapper
tcpRouteBackendMapper *bimapper.Mapper
resolveFailoverServiceDestinations ResolveFailoverServiceDestinations
resolveComputedFailoverServiceDestinations ResolveComputedFailoverServiceDestinations
}
// New creates a new Mapper.
func New(resolver ResolveFailoverServiceDestinations) *Mapper {
func New(resolver ResolveComputedFailoverServiceDestinations) *Mapper {
if resolver == nil {
panic("must specify a ResolveFailoverServiceDestinations callback")
}
@ -59,7 +59,7 @@ func New(resolver ResolveFailoverServiceDestinations) *Mapper {
grpcRouteBackendMapper: bimapper.New(pbmesh.GRPCRouteType, pbcatalog.ServiceType),
tcpRouteBackendMapper: bimapper.New(pbmesh.TCPRouteType, pbcatalog.ServiceType),
resolveFailoverServiceDestinations: resolver,
resolveComputedFailoverServiceDestinations: resolver,
}
}
@ -235,7 +235,7 @@ func (m *Mapper) MapService(
// 2. xRoute[parentRef=OUTPUT_EVENT; backendRef=SOMETHING], FailoverPolicy[name=SOMETHING, destRef=INPUT_EVENT]
// (case 2) First find all failover policies that have a reference to our input service.
effectiveServiceIDs, err := m.resolveFailoverServiceDestinations(ctx, rt, res.Id)
effectiveServiceIDs, err := m.resolveComputedFailoverServiceDestinations(ctx, rt, res.Id)
if err != nil {
return nil, err
}

View File

@ -83,12 +83,12 @@ func testMapper_Tracking(t *testing.T, typ *pbresource.Type, newRoute func(t *te
// temporarily creating the cache here until we can get rid of this xroutemapper object entirely. Its not super clean to hack together a cache for usage in this func
// but its better than alternatives and this should be relatively short lived.
testCache := cache.New()
testCache.AddIndex(pbcatalog.FailoverPolicyType, indexers.RefOrIDIndex("dest-refs", func(res *resource.DecodedResource[*pbcatalog.FailoverPolicy]) []*pbresource.Reference {
testCache.AddIndex(pbcatalog.ComputedFailoverPolicyType, indexers.RefOrIDIndex("dest-refs", func(res *resource.DecodedResource[*pbcatalog.ComputedFailoverPolicy]) []*pbresource.Reference {
return res.Data.GetUnderlyingDestinationRefs()
}))
m := New(func(_ context.Context, rt controller.Runtime, id *pbresource.ID) ([]*pbresource.ID, error) {
iter, err := rt.Cache.ListIterator(pbcatalog.FailoverPolicyType, "dest-refs", id)
iter, err := rt.Cache.ListIterator(pbcatalog.ComputedFailoverPolicyType, "dest-refs", id)
if err != nil {
return nil, err
}
@ -131,12 +131,22 @@ func testMapper_Tracking(t *testing.T, typ *pbresource.Type, newRoute func(t *te
Ref: ref,
})
}
policy := rtest.Resource(pbcatalog.FailoverPolicyType, name).
failoverPolicy := &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: dests,
},
}
simiplifiedFailoverPolicy := catalog.SimplifyFailoverPolicy(&pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}},
}, failoverPolicy)
policy := rtest.Resource(pbcatalog.ComputedFailoverPolicyType, name).
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(t, &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: dests,
},
WithData(t, &pbcatalog.ComputedFailoverPolicy{
PortConfigs: simiplifiedFailoverPolicy.PortConfigs,
BoundReferences: refs,
}).Build()
rtest.ValidateAndNormalize(t, registry, policy)
return policy
@ -661,7 +671,7 @@ func requireTracking(
reqs, err = mapper.MapTCPRoute(context.Background(), rt, res)
case resource.EqualType(pbmesh.DestinationPolicyType, res.Id.Type):
reqs, err = mapper.MapServiceNameAligned(context.Background(), rt, res)
case resource.EqualType(pbcatalog.FailoverPolicyType, res.Id.Type):
case resource.EqualType(pbcatalog.ComputedFailoverPolicyType, res.Id.Type):
c.Insert(res)
reqs, err = mapper.MapServiceNameAligned(context.Background(), rt, res)
case resource.EqualType(pbcatalog.ServiceType, res.Id.Type):

View File

@ -209,9 +209,9 @@ func TestBuildExplicitDestinations(t *testing.T) {
Build()
resourcetest.ValidateAndNormalize(t, registry, api1HTTPRoute)
api1FailoverPolicy := resourcetest.Resource(pbcatalog.FailoverPolicyType, "api-1").
api1FailoverPolicy := resourcetest.Resource(pbcatalog.ComputedFailoverPolicyType, "api-1").
WithTenancy(tenancy).
WithData(t, &pbcatalog.FailoverPolicy{
WithData(t, &pbcatalog.ComputedFailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{{
@ -298,7 +298,7 @@ func TestBuildExplicitDestinations(t *testing.T) {
resourcetest.MustDecode[*pbmesh.DestinationPolicy](t, api1DestPolicy),
resourcetest.MustDecode[*pbmesh.HTTPRoute](t, api1HTTPRoute),
resourcetest.MustDecode[*pbmesh.TCPRoute](t, api1TCPRoute),
resourcetest.MustDecode[*pbcatalog.FailoverPolicy](t, api1FailoverPolicy),
resourcetest.MustDecode[*pbcatalog.ComputedFailoverPolicy](t, api1FailoverPolicy),
resourcetest.MustDecode[*pbmesh.TCPRoute](t, api1TCP2Route),
)
require.NotNil(t, api1ComputedRoutes)

View File

@ -201,14 +201,19 @@ func TestUnified_AllMappingsToProxyStateTemplate(t *testing.T) {
}},
}).
Build()
failoverPolicy := resourcetest.ResourceID(resource.ReplaceType(pbcatalog.FailoverPolicyType, targetService.Id)).
failoverPolicy := &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: []*pbcatalog.FailoverDestination{{
Ref: backupTargetServiceRef,
}},
},
}
simiplifiedFailoverPolicy := catalog.SimplifyFailoverPolicy(anyServiceData, failoverPolicy)
computedFailoverPolicy := resourcetest.ResourceID(resource.ReplaceType(pbcatalog.ComputedFailoverPolicyType, targetService.Id)).
WithTenancy(tenancy).
WithData(t, &pbcatalog.FailoverPolicy{
Config: &pbcatalog.FailoverConfig{
Destinations: []*pbcatalog.FailoverDestination{{
Ref: backupTargetServiceRef,
}},
},
WithData(t, &pbcatalog.ComputedFailoverPolicy{
PortConfigs: simiplifiedFailoverPolicy.PortConfigs,
}).
Build()
webRoutes := routestest.BuildComputedRoutes(t, resource.ReplaceType(pbmesh.ComputedRoutesType, destService.Id),
@ -216,7 +221,7 @@ func TestUnified_AllMappingsToProxyStateTemplate(t *testing.T) {
resourcetest.MustDecode[*pbcatalog.Service](t, targetService),
resourcetest.MustDecode[*pbcatalog.Service](t, backupTargetService),
resourcetest.MustDecode[*pbmesh.TCPRoute](t, tcpRoute),
resourcetest.MustDecode[*pbcatalog.FailoverPolicy](t, failoverPolicy),
resourcetest.MustDecode[*pbcatalog.ComputedFailoverPolicy](t, computedFailoverPolicy),
)
var (

View File

@ -19,7 +19,7 @@ type (
DecodedDestinationsConfiguration = resource.DecodedResource[*pbmesh.DestinationsConfiguration]
DecodedComputedRoutes = resource.DecodedResource[*pbmesh.ComputedRoutes]
DecodedComputedTrafficPermissions = resource.DecodedResource[*pbauth.ComputedTrafficPermissions]
DecodedFailoverPolicy = resource.DecodedResource[*pbcatalog.FailoverPolicy]
DecodedComputedFailoverPolicy = resource.DecodedResource[*pbcatalog.ComputedFailoverPolicy]
DecodedService = resource.DecodedResource[*pbcatalog.Service]
DecodedServiceEndpoints = resource.DecodedResource[*pbcatalog.ServiceEndpoints]
DecodedWorkload = resource.DecodedResource[*pbcatalog.Workload]

View File

@ -0,0 +1,48 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package catalogv2beta1
import "github.com/hashicorp/consul/proto-public/pbresource"
// GetUnderlyingDestinations will collect FailoverDestinations from all
// internal fields and bundle them up in one slice.
//
// NOTE: no deduplication occurs.
func (x *ComputedFailoverPolicy) GetUnderlyingDestinations() []*FailoverDestination {
if x == nil {
return nil
}
estimate := 0
for _, pc := range x.PortConfigs {
estimate += len(pc.Destinations)
}
out := make([]*FailoverDestination, 0, estimate)
for _, pc := range x.PortConfigs {
out = append(out, pc.Destinations...)
}
return out
}
// GetUnderlyingDestinationRefs is like GetUnderlyingDestinations except it
// returns a slice of References.
//
// NOTE: no deduplication occurs.
func (x *ComputedFailoverPolicy) GetUnderlyingDestinationRefs() []*pbresource.Reference {
if x == nil {
return nil
}
dests := x.GetUnderlyingDestinations()
out := make([]*pbresource.Reference, 0, len(dests))
for _, dest := range dests {
if dest.Ref != nil {
out = append(out, dest.Ref)
}
}
return out
}

View File

@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package catalogv2beta1
import (
"testing"
"github.com/hashicorp/consul/proto-public/pbresource"
)
func TestComputedFailoverPolicy_GetUnderlyingDestinations_AndRefs(t *testing.T) {
type testcase struct {
failover *ComputedFailoverPolicy
expectDests []*FailoverDestination
expectRefs []*pbresource.Reference
}
run := func(t *testing.T, tc testcase) {
assertSliceEquals(t, tc.expectDests, tc.failover.GetUnderlyingDestinations())
assertSliceEquals(t, tc.expectRefs, tc.failover.GetUnderlyingDestinationRefs())
}
cases := map[string]testcase{
"nil": {},
"kitchen sink dests": {
failover: &ComputedFailoverPolicy{
PortConfigs: map[string]*FailoverConfig{
"http": {
Destinations: []*FailoverDestination{
newFailoverDestination("foo"),
newFailoverDestination("bar"),
},
},
"admin": {
Destinations: []*FailoverDestination{
newFailoverDestination("admin"),
},
},
"web": {
Destinations: []*FailoverDestination{
newFailoverDestination("foo"), // duplicated
newFailoverDestination("www"),
},
},
},
},
expectDests: []*FailoverDestination{
newFailoverDestination("foo"),
newFailoverDestination("bar"),
newFailoverDestination("admin"),
newFailoverDestination("foo"), // duplicated
newFailoverDestination("www"),
},
expectRefs: []*pbresource.Reference{
newFailoverRef("foo"),
newFailoverRef("bar"),
newFailoverRef("admin"),
newFailoverRef("foo"), // duplicated
newFailoverRef("www"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -3,56 +3,6 @@
package catalogv2beta1
import "github.com/hashicorp/consul/proto-public/pbresource"
// GetUnderlyingDestinations will collect FailoverDestinations from all
// internal fields and bundle them up in one slice.
//
// NOTE: no deduplication occurs.
func (x *FailoverPolicy) GetUnderlyingDestinations() []*FailoverDestination {
if x == nil {
return nil
}
estimate := 0
if x.Config != nil {
estimate += len(x.Config.Destinations)
}
for _, pc := range x.PortConfigs {
estimate += len(pc.Destinations)
}
out := make([]*FailoverDestination, 0, estimate)
if x.Config != nil {
out = append(out, x.Config.Destinations...)
}
for _, pc := range x.PortConfigs {
out = append(out, pc.Destinations...)
}
return out
}
// GetUnderlyingDestinationRefs is like GetUnderlyingDestinations except it
// returns a slice of References.
//
// NOTE: no deduplication occurs.
func (x *FailoverPolicy) GetUnderlyingDestinationRefs() []*pbresource.Reference {
if x == nil {
return nil
}
dests := x.GetUnderlyingDestinations()
out := make([]*pbresource.Reference, 0, len(dests))
for _, dest := range dests {
if dest.Ref != nil {
out = append(out, dest.Ref)
}
}
return out
}
// IsEmpty returns true if a config has no definition.
func (x *FailoverConfig) IsEmpty() bool {
if x == nil {

View File

@ -44,66 +44,6 @@ func TestFailoverPolicy_IsEmpty(t *testing.T) {
})
}
func TestFailoverPolicy_GetUnderlyingDestinations_AndRefs(t *testing.T) {
type testcase struct {
failover *FailoverPolicy
expectDests []*FailoverDestination
expectRefs []*pbresource.Reference
}
run := func(t *testing.T, tc testcase) {
assertSliceEquals(t, tc.expectDests, tc.failover.GetUnderlyingDestinations())
assertSliceEquals(t, tc.expectRefs, tc.failover.GetUnderlyingDestinationRefs())
}
cases := map[string]testcase{
"nil": {},
"kitchen sink dests": {
failover: &FailoverPolicy{
Config: &FailoverConfig{
Destinations: []*FailoverDestination{
newFailoverDestination("foo"),
newFailoverDestination("bar"),
},
},
PortConfigs: map[string]*FailoverConfig{
"admin": {
Destinations: []*FailoverDestination{
newFailoverDestination("admin"),
},
},
"web": {
Destinations: []*FailoverDestination{
newFailoverDestination("foo"), // duplicated
newFailoverDestination("www"),
},
},
},
},
expectDests: []*FailoverDestination{
newFailoverDestination("foo"),
newFailoverDestination("bar"),
newFailoverDestination("admin"),
newFailoverDestination("foo"), // duplicated
newFailoverDestination("www"),
},
expectRefs: []*pbresource.Reference{
newFailoverRef("foo"),
newFailoverRef("bar"),
newFailoverRef("admin"),
newFailoverRef("foo"), // duplicated
newFailoverRef("www"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func assertSliceEquals[V proto.Message](t *testing.T, expect, got []V) {
t.Helper()