mirror of https://github.com/hashicorp/consul
Browse Source
* remove v2 tenancy, catalog, and mesh - Inline the v2tenancy experiment to false - Inline the resource-apis experiment to false - Inline the hcp-v2-resource-apis experiment to false - Remove ACL policy templates and rule language changes related to workload identities (a v2-only concept) (e.g. identity and identity_prefix) - Update the gRPC endpoint used by consul-dataplane to no longer respond specially for v2 - Remove stray v2 references scattered throughout the DNS v1.5 newer implementation. * changelog * go mod tidy on consul containers * lint fixes from ENT --------- Co-authored-by: John Murret <john.murret@hashicorp.com>pull/21682/head
R.B. Boyer
3 months ago
committed by
GitHub
1247 changed files with 971 additions and 186777 deletions
@ -0,0 +1,3 @@
|
||||
```release-note:feature |
||||
server: remove v2 tenancy, catalog, and mesh experiments |
||||
``` |
@ -1,40 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package connect |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net/url" |
||||
) |
||||
|
||||
// SpiffeIDWorkloadIdentity is the structure to represent the SPIFFE ID for a workload.
|
||||
type SpiffeIDWorkloadIdentity struct { |
||||
TrustDomain string |
||||
Partition string |
||||
Namespace string |
||||
WorkloadIdentity string |
||||
} |
||||
|
||||
// URI returns the *url.URL for this SPIFFE ID.
|
||||
func (id SpiffeIDWorkloadIdentity) URI() *url.URL { |
||||
var result url.URL |
||||
result.Scheme = "spiffe" |
||||
result.Host = id.TrustDomain |
||||
result.Path = id.uriPath() |
||||
return &result |
||||
} |
||||
|
||||
func (id SpiffeIDWorkloadIdentity) uriPath() string { |
||||
// Although CE has no support for partitions, it still needs to be able to
|
||||
// handle exportedPartition from peered Consul Enterprise clusters in order
|
||||
// to generate the correct SpiffeID.
|
||||
// We intentionally avoid using pbpartition.DefaultName here to be CE friendly.
|
||||
path := fmt.Sprintf("/ap/%s/ns/%s/identity/%s", |
||||
id.Partition, |
||||
id.Namespace, |
||||
id.WorkloadIdentity, |
||||
) |
||||
|
||||
return path |
||||
} |
@ -1,18 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !consulent
|
||||
|
||||
package connect |
||||
|
||||
import ( |
||||
"github.com/hashicorp/consul/acl" |
||||
) |
||||
|
||||
// TODO: this will need to somehow be updated to set namespace here when we include namespaces in CE
|
||||
|
||||
// GetEnterpriseMeta will synthesize an EnterpriseMeta struct from the SpiffeIDWorkloadIdentity.
|
||||
// in CE this just returns an empty (but never nil) struct pointer
|
||||
func (id SpiffeIDWorkloadIdentity) GetEnterpriseMeta() *acl.EnterpriseMeta { |
||||
return &acl.EnterpriseMeta{} |
||||
} |
@ -1,31 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package connect |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestSpiffeIDWorkloadURI(t *testing.T) { |
||||
t.Run("spiffe id workload uri default tenancy", func(t *testing.T) { |
||||
wl := &SpiffeIDWorkloadIdentity{ |
||||
TrustDomain: "1234.consul", |
||||
WorkloadIdentity: "web", |
||||
Partition: "default", |
||||
Namespace: "default", |
||||
} |
||||
require.Equal(t, "spiffe://1234.consul/ap/default/ns/default/identity/web", wl.URI().String()) |
||||
}) |
||||
t.Run("spiffe id workload uri non-default tenancy", func(t *testing.T) { |
||||
wl := &SpiffeIDWorkloadIdentity{ |
||||
TrustDomain: "1234.consul", |
||||
WorkloadIdentity: "web", |
||||
Partition: "part1", |
||||
Namespace: "dev", |
||||
} |
||||
require.Equal(t, "spiffe://1234.consul/ap/part1/ns/dev/identity/web", wl.URI().String()) |
||||
}) |
||||
} |
@ -1,17 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !consulent
|
||||
|
||||
package consul |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/hashicorp/consul/internal/storage" |
||||
) |
||||
|
||||
func (s *Server) createDefaultPartition(ctx context.Context, b storage.Backend) error { |
||||
// no-op
|
||||
return nil |
||||
} |
@ -1,411 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package consul |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"github.com/google/go-cmp/cmp" |
||||
"github.com/google/go-cmp/cmp/cmpopts" |
||||
"github.com/hashicorp/go-hclog" |
||||
"github.com/hashicorp/serf/serf" |
||||
"google.golang.org/grpc/codes" |
||||
"google.golang.org/grpc/status" |
||||
"google.golang.org/protobuf/testing/protocmp" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/hashicorp/consul/acl" |
||||
"github.com/hashicorp/consul/agent/metadata" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/hashicorp/consul/internal/resource" |
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" |
||||
"github.com/hashicorp/consul/proto-public/pbresource" |
||||
"github.com/hashicorp/consul/types" |
||||
) |
||||
|
||||
const ( |
||||
consulWorkloadPrefix = "consul-server-" |
||||
consulPortNameServer = "server" |
||||
) |
||||
|
||||
var _ ConsulRegistrator = (*V2ConsulRegistrator)(nil) |
||||
|
||||
var resourceCmpOptions = []cmp.Option{ |
||||
protocmp.IgnoreFields(&pbresource.Resource{}, "status", "generation", "version"), |
||||
protocmp.IgnoreFields(&pbresource.ID{}, "uid"), |
||||
protocmp.Transform(), |
||||
// Stringify any type passed to the sorter so that we can reliably compare most values.
|
||||
cmpopts.SortSlices(func(a, b any) bool { return fmt.Sprintf("%v", a) < fmt.Sprintf("%v", b) }), |
||||
} |
||||
|
||||
type V2ConsulRegistrator struct { |
||||
Logger hclog.Logger |
||||
NodeName string |
||||
EntMeta *acl.EnterpriseMeta |
||||
|
||||
Client pbresource.ResourceServiceClient |
||||
} |
||||
|
||||
// HandleAliveMember is used to ensure the server is registered as a Workload
|
||||
// with a passing health check.
|
||||
func (r V2ConsulRegistrator) HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error { |
||||
valid, parts := metadata.IsConsulServer(member) |
||||
if !valid { |
||||
return nil |
||||
} |
||||
|
||||
if nodeEntMeta == nil { |
||||
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() |
||||
} |
||||
|
||||
// Attempt to join the consul server, regardless of the existing catalog state
|
||||
if err := joinServer(member, parts); err != nil { |
||||
return err |
||||
} |
||||
|
||||
r.Logger.Info("member joined, creating catalog entries", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
workloadResource, err := r.createWorkloadFromMember(member, parts, nodeEntMeta) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Check if the Workload already exists and if it's the same
|
||||
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadResource.Id}) |
||||
if err != nil && !grpcNotFoundErr(err) { |
||||
return fmt.Errorf("error checking for existing Workload %s: %w", workloadResource.Id.Name, err) |
||||
} |
||||
|
||||
if err == nil { |
||||
existingWorkload := res.GetResource() |
||||
|
||||
r.Logger.Debug("existing Workload matching the member found", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
// If the Workload is identical, move to updating the health status
|
||||
if cmp.Equal(workloadResource, existingWorkload, resourceCmpOptions...) { |
||||
r.Logger.Debug("no updates to perform on member Workload", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
goto HEALTHSTATUS |
||||
} |
||||
|
||||
// If the existing Workload different, add the existing Version into the patch for CAS write
|
||||
workloadResource.Id = existingWorkload.Id |
||||
workloadResource.Version = existingWorkload.Version |
||||
} |
||||
|
||||
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: workloadResource}); err != nil { |
||||
return fmt.Errorf("failed to write Workload %s: %w", workloadResource.Id.Name, err) |
||||
} |
||||
|
||||
r.Logger.Info("updated consul Workload in catalog", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
HEALTHSTATUS: |
||||
hsResource, err := r.createHealthStatusFromMember(member, workloadResource.Id, true, nodeEntMeta) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Check if the HealthStatus already exists and if it's the same
|
||||
res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id}) |
||||
if err != nil && !grpcNotFoundErr(err) { |
||||
return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err) |
||||
} |
||||
|
||||
if err == nil { |
||||
existingHS := res.GetResource() |
||||
|
||||
r.Logger.Debug("existing HealthStatus matching the member found", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
// If the HealthStatus is identical, we're done.
|
||||
if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) { |
||||
r.Logger.Debug("no updates to perform on member HealthStatus", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
|
||||
// If the existing HealthStatus is different, add the Version to the patch for CAS write.
|
||||
hsResource.Id = existingHS.Id |
||||
hsResource.Version = existingHS.Version |
||||
} |
||||
|
||||
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil { |
||||
return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err) |
||||
} |
||||
r.Logger.Info("updated consul HealthStatus in catalog", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
|
||||
func (r V2ConsulRegistrator) createWorkloadFromMember(member serf.Member, parts *metadata.Server, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) { |
||||
workloadMeta := map[string]string{ |
||||
"read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"), |
||||
"raft_version": strconv.Itoa(parts.RaftVersion), |
||||
"serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10), |
||||
"serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10), |
||||
"serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10), |
||||
"version": parts.Build.String(), |
||||
} |
||||
|
||||
if parts.ExternalGRPCPort > 0 { |
||||
workloadMeta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort) |
||||
} |
||||
if parts.ExternalGRPCTLSPort > 0 { |
||||
workloadMeta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort) |
||||
} |
||||
|
||||
if parts.Port < 0 || parts.Port > 65535 { |
||||
return nil, fmt.Errorf("invalid port: %d", parts.Port) |
||||
} |
||||
|
||||
workload := &pbcatalog.Workload{ |
||||
Addresses: []*pbcatalog.WorkloadAddress{ |
||||
{Host: member.Addr.String(), Ports: []string{consulPortNameServer}}, |
||||
}, |
||||
// Don't include identity since Consul is not routable through the mesh.
|
||||
// Don't include locality because these values are not passed along through serf, and they are probably
|
||||
// different from the leader's values.
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{ |
||||
consulPortNameServer: { |
||||
Port: uint32(parts.Port), |
||||
Protocol: pbcatalog.Protocol_PROTOCOL_TCP, |
||||
}, |
||||
// TODO: add other agent ports
|
||||
}, |
||||
} |
||||
|
||||
workloadData, err := anypb.New(workload) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("could not convert Workload to 'any' type: %w", err) |
||||
} |
||||
|
||||
workloadId := &pbresource.ID{ |
||||
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), |
||||
Type: pbcatalog.WorkloadType, |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
} |
||||
workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() |
||||
|
||||
return &pbresource.Resource{ |
||||
Id: workloadId, |
||||
Data: workloadData, |
||||
Metadata: workloadMeta, |
||||
}, nil |
||||
} |
||||
|
||||
func (r V2ConsulRegistrator) createHealthStatusFromMember(member serf.Member, workloadId *pbresource.ID, passing bool, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) { |
||||
hs := &pbcatalog.HealthStatus{ |
||||
Type: string(structs.SerfCheckID), |
||||
Description: structs.SerfCheckName, |
||||
} |
||||
|
||||
if passing { |
||||
hs.Status = pbcatalog.Health_HEALTH_PASSING |
||||
hs.Output = structs.SerfCheckAliveOutput |
||||
} else { |
||||
hs.Status = pbcatalog.Health_HEALTH_CRITICAL |
||||
hs.Output = structs.SerfCheckFailedOutput |
||||
} |
||||
|
||||
hsData, err := anypb.New(hs) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("could not convert HealthStatus to 'any' type: %w", err) |
||||
} |
||||
|
||||
hsId := &pbresource.ID{ |
||||
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), |
||||
Type: pbcatalog.HealthStatusType, |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
} |
||||
hsId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() |
||||
|
||||
return &pbresource.Resource{ |
||||
Id: hsId, |
||||
Data: hsData, |
||||
Owner: workloadId, |
||||
}, nil |
||||
} |
||||
|
||||
// HandleFailedMember is used to mark the workload's associated HealthStatus.
|
||||
func (r V2ConsulRegistrator) HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { |
||||
if valid, _ := metadata.IsConsulServer(member); !valid { |
||||
return nil |
||||
} |
||||
|
||||
if nodeEntMeta == nil { |
||||
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() |
||||
} |
||||
|
||||
r.Logger.Info("member failed", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
// Validate that the associated workload exists
|
||||
workloadId := &pbresource.ID{ |
||||
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), |
||||
Type: pbcatalog.WorkloadType, |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
} |
||||
workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() |
||||
|
||||
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadId}) |
||||
if err != nil && !grpcNotFoundErr(err) { |
||||
return fmt.Errorf("error checking for existing Workload %s: %w", workloadId.Name, err) |
||||
} |
||||
if grpcNotFoundErr(err) { |
||||
r.Logger.Info("ignoring failed event for member because it does not exist in the catalog", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
// Overwrite the workload ID with the one that has UID populated.
|
||||
existingWorkload := res.GetResource() |
||||
|
||||
hsResource, err := r.createHealthStatusFromMember(member, existingWorkload.Id, false, nodeEntMeta) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id}) |
||||
if err != nil && !grpcNotFoundErr(err) { |
||||
return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err) |
||||
} |
||||
|
||||
if err == nil { |
||||
existingHS := res.GetResource() |
||||
r.Logger.Debug("existing HealthStatus matching the member found", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
|
||||
// If the HealthStatus is identical, we're done.
|
||||
if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) { |
||||
r.Logger.Debug("no updates to perform on member HealthStatus", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
|
||||
// If the existing HealthStatus is different, add the Version to the patch for CAS write.
|
||||
hsResource.Id = existingHS.Id |
||||
hsResource.Version = existingHS.Version |
||||
} |
||||
|
||||
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil { |
||||
return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err) |
||||
} |
||||
r.Logger.Info("updated consul HealthStatus in catalog", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
|
||||
// HandleLeftMember is used to handle members that gracefully
|
||||
// left. They are removed if necessary.
|
||||
func (r V2ConsulRegistrator) HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { |
||||
return r.handleDeregisterMember("left", member, nodeEntMeta, removeServerFunc) |
||||
} |
||||
|
||||
// HandleReapMember is used to handle members that have been
|
||||
// reaped after a prolonged failure. They are removed from the catalog.
|
||||
func (r V2ConsulRegistrator) HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { |
||||
return r.handleDeregisterMember("reaped", member, nodeEntMeta, removeServerFunc) |
||||
} |
||||
|
||||
// handleDeregisterMember is used to remove a member of a given reason
|
||||
func (r V2ConsulRegistrator) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { |
||||
if valid, _ := metadata.IsConsulServer(member); !valid { |
||||
return nil |
||||
} |
||||
|
||||
if nodeEntMeta == nil { |
||||
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() |
||||
} |
||||
|
||||
r.Logger.Info("removing member", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
"reason", reason, |
||||
) |
||||
|
||||
if err := removeServerFunc(member); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Do not remove our self. This can only happen if the current leader
|
||||
// is leaving. Instead, we should allow a follower to take-over and
|
||||
// remove us later.
|
||||
if strings.EqualFold(member.Name, r.NodeName) && |
||||
strings.EqualFold(nodeEntMeta.PartitionOrDefault(), r.EntMeta.PartitionOrDefault()) { |
||||
r.Logger.Warn("removing self should be done by follower", |
||||
"name", r.NodeName, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
"reason", reason, |
||||
) |
||||
return nil |
||||
} |
||||
|
||||
// Check if the workload exists
|
||||
workloadID := &pbresource.ID{ |
||||
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), |
||||
Type: pbcatalog.WorkloadType, |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
} |
||||
workloadID.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() |
||||
|
||||
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadID}) |
||||
if err != nil && !grpcNotFoundErr(err) { |
||||
return fmt.Errorf("error checking for existing Workload %s: %w", workloadID.Name, err) |
||||
} |
||||
if grpcNotFoundErr(err) { |
||||
r.Logger.Info("ignoring reap event for member because it does not exist in the catalog", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return nil |
||||
} |
||||
existingWorkload := res.GetResource() |
||||
|
||||
// The HealthStatus should be reaped automatically
|
||||
if _, err := r.Client.Delete(context.TODO(), &pbresource.DeleteRequest{Id: existingWorkload.Id}); err != nil { |
||||
return fmt.Errorf("failed to delete Workload %s: %w", existingWorkload.Id.Name, err) |
||||
} |
||||
r.Logger.Info("deleted consul Workload", |
||||
"member", member.Name, |
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), |
||||
) |
||||
return err |
||||
} |
||||
|
||||
func grpcNotFoundErr(err error) bool { |
||||
if err == nil { |
||||
return false |
||||
} |
||||
s, ok := status.FromError(err) |
||||
return ok && s.Code() == codes.NotFound |
||||
} |
@ -1,583 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package consul |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"testing" |
||||
|
||||
"github.com/google/go-cmp/cmp" |
||||
"github.com/hashicorp/go-hclog" |
||||
"github.com/hashicorp/serf/serf" |
||||
"github.com/stretchr/testify/mock" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/grpc/codes" |
||||
"google.golang.org/grpc/status" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/hashicorp/consul/acl" |
||||
"github.com/hashicorp/consul/agent/metadata" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource" |
||||
"github.com/hashicorp/consul/internal/resource" |
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" |
||||
"github.com/hashicorp/consul/proto-public/pbresource" |
||||
) |
||||
|
||||
var ( |
||||
fakeWrappedErr = fmt.Errorf("fake test error") |
||||
) |
||||
|
||||
type testCase struct { |
||||
name string |
||||
member serf.Member |
||||
nodeNameOverride string // This is used in the HandleLeftMember test to avoid deregistering ourself
|
||||
|
||||
existingWorkload *pbresource.Resource |
||||
workloadReadErr bool |
||||
workloadWriteErr bool |
||||
workloadDeleteErr bool |
||||
|
||||
existingHealthStatus *pbresource.Resource |
||||
healthstatusReadErr bool |
||||
healthstatusWriteErr bool |
||||
|
||||
mutatedWorkload *pbresource.Resource // leaving one of these out means the mock expects not to have a write/delete called
|
||||
mutatedHealthStatus *pbresource.Resource |
||||
expErr string |
||||
} |
||||
|
||||
func Test_HandleAliveMember(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
run := func(t *testing.T, tt testCase) { |
||||
client := mockpbresource.NewResourceServiceClient(t) |
||||
mockClient := client.EXPECT() |
||||
|
||||
// Build mock expectations based on the order of HandleAliveMember resource calls
|
||||
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) |
||||
setupWriteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadWriteErr) |
||||
if !tt.workloadReadErr && !tt.workloadWriteErr { |
||||
// We expect to bail before this read if there is an error earlier in the function
|
||||
setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr) |
||||
} |
||||
setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr) |
||||
|
||||
registrator := V2ConsulRegistrator{ |
||||
Logger: hclog.New(&hclog.LoggerOptions{}), |
||||
NodeName: "test-server-1", |
||||
Client: client, |
||||
} |
||||
|
||||
// Mock join function
|
||||
var joinMockCalled bool |
||||
joinMock := func(_ serf.Member, _ *metadata.Server) error { |
||||
joinMockCalled = true |
||||
return nil |
||||
} |
||||
|
||||
err := registrator.HandleAliveMember(tt.member, acl.DefaultEnterpriseMeta(), joinMock) |
||||
if tt.expErr != "" { |
||||
require.Contains(t, err.Error(), tt.expErr) |
||||
} else { |
||||
require.NoError(t, err) |
||||
} |
||||
require.True(t, joinMockCalled, "the mock join function was not called") |
||||
} |
||||
|
||||
tests := []testCase{ |
||||
{ |
||||
name: "New alive member", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
mutatedWorkload: getTestWorkload(t), |
||||
mutatedHealthStatus: getTestHealthStatus(t, true), |
||||
}, |
||||
{ |
||||
name: "No updates needed", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
existingWorkload: getTestWorkload(t), |
||||
existingHealthStatus: getTestHealthStatus(t, true), |
||||
}, |
||||
{ |
||||
name: "Existing Workload and HS need to be updated", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
existingWorkload: getTestWorkloadWithPort(t, 8301), |
||||
existingHealthStatus: getTestHealthStatus(t, false), |
||||
mutatedWorkload: getTestWorkload(t), |
||||
mutatedHealthStatus: getTestHealthStatus(t, true), |
||||
}, |
||||
{ |
||||
name: "Only the HS needs to be updated", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
existingWorkload: getTestWorkload(t), |
||||
existingHealthStatus: getTestHealthStatus(t, false), |
||||
mutatedHealthStatus: getTestHealthStatus(t, true), |
||||
}, |
||||
{ |
||||
name: "Error reading Workload", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
workloadReadErr: true, |
||||
expErr: "error checking for existing Workload", |
||||
}, |
||||
{ |
||||
name: "Error writing Workload", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
workloadWriteErr: true, |
||||
mutatedWorkload: getTestWorkload(t), |
||||
expErr: "failed to write Workload", |
||||
}, |
||||
{ |
||||
name: "Error reading HealthStatus", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
healthstatusReadErr: true, |
||||
mutatedWorkload: getTestWorkload(t), |
||||
expErr: "error checking for existing HealthStatus", |
||||
}, |
||||
{ |
||||
name: "Error writing HealthStatus", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
healthstatusWriteErr: true, |
||||
mutatedWorkload: getTestWorkload(t), |
||||
mutatedHealthStatus: getTestHealthStatus(t, true), |
||||
expErr: "failed to write HealthStatus", |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
run(t, tt) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func Test_HandleFailedMember(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
run := func(t *testing.T, tt testCase) { |
||||
client := mockpbresource.NewResourceServiceClient(t) |
||||
mockClient := client.EXPECT() |
||||
|
||||
// Build mock expectations based on the order of HandleFailed resource calls
|
||||
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) |
||||
if !tt.workloadReadErr && tt.existingWorkload != nil { |
||||
// We expect to bail before this read if there is an error earlier in the function or there is no workload
|
||||
setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr) |
||||
} |
||||
setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr) |
||||
|
||||
registrator := V2ConsulRegistrator{ |
||||
Logger: hclog.New(&hclog.LoggerOptions{}), |
||||
NodeName: "test-server-1", |
||||
Client: client, |
||||
} |
||||
|
||||
err := registrator.HandleFailedMember(tt.member, acl.DefaultEnterpriseMeta()) |
||||
if tt.expErr != "" { |
||||
require.Contains(t, err.Error(), tt.expErr) |
||||
} else { |
||||
require.NoError(t, err) |
||||
} |
||||
} |
||||
|
||||
tests := []testCase{ |
||||
{ |
||||
name: "Update non-existent HealthStatus", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
existingWorkload: getTestWorkload(t), |
||||
mutatedHealthStatus: getTestHealthStatus(t, false), |
||||
}, |
||||
{ |
||||
name: "Underlying Workload does not exist", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
}, |
||||
{ |
||||
name: "Update an existing HealthStatus", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
existingWorkload: getTestWorkload(t), |
||||
existingHealthStatus: getTestHealthStatus(t, true), |
||||
mutatedHealthStatus: getTestHealthStatus(t, false), |
||||
}, |
||||
{ |
||||
name: "HealthStatus is already critical - no updates needed", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
existingWorkload: getTestWorkload(t), |
||||
existingHealthStatus: getTestHealthStatus(t, false), |
||||
}, |
||||
{ |
||||
name: "Error reading Workload", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
workloadReadErr: true, |
||||
expErr: "error checking for existing Workload", |
||||
}, |
||||
{ |
||||
name: "Error reading HealthStatus", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
existingWorkload: getTestWorkload(t), |
||||
healthstatusReadErr: true, |
||||
expErr: "error checking for existing HealthStatus", |
||||
}, |
||||
{ |
||||
name: "Error writing HealthStatus", |
||||
member: getTestSerfMember(serf.StatusFailed), |
||||
existingWorkload: getTestWorkload(t), |
||||
healthstatusWriteErr: true, |
||||
mutatedHealthStatus: getTestHealthStatus(t, false), |
||||
expErr: "failed to write HealthStatus", |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
run(t, tt) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
// Test_HandleLeftMember also tests HandleReapMembers, which are the same core logic with some different logs.
|
||||
func Test_HandleLeftMember(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
run := func(t *testing.T, tt testCase) { |
||||
client := mockpbresource.NewResourceServiceClient(t) |
||||
mockClient := client.EXPECT() |
||||
|
||||
// Build mock expectations based on the order of HandleLeftMember resource calls
|
||||
// We check for the override, which we use to skip self de-registration
|
||||
if tt.nodeNameOverride == "" { |
||||
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) |
||||
if tt.existingWorkload != nil && !tt.workloadReadErr { |
||||
setupDeleteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadDeleteErr) |
||||
} |
||||
} |
||||
|
||||
nodeName := "test-server-2" // This is not the same as the serf node so we don't dergister ourself.
|
||||
if tt.nodeNameOverride != "" { |
||||
nodeName = tt.nodeNameOverride |
||||
} |
||||
|
||||
registrator := V2ConsulRegistrator{ |
||||
Logger: hclog.New(&hclog.LoggerOptions{}), |
||||
NodeName: nodeName, // We change this so that we don't deregister ourself
|
||||
Client: client, |
||||
} |
||||
|
||||
// Mock join function
|
||||
var removeMockCalled bool |
||||
removeMock := func(_ serf.Member) error { |
||||
removeMockCalled = true |
||||
return nil |
||||
} |
||||
|
||||
err := registrator.HandleLeftMember(tt.member, acl.DefaultEnterpriseMeta(), removeMock) |
||||
if tt.expErr != "" { |
||||
require.Contains(t, err.Error(), tt.expErr) |
||||
} else { |
||||
require.NoError(t, err) |
||||
} |
||||
require.True(t, removeMockCalled, "the mock remove function was not called") |
||||
} |
||||
|
||||
tests := []testCase{ |
||||
{ |
||||
name: "Remove member", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
existingWorkload: getTestWorkload(t), |
||||
mutatedWorkload: getTestWorkload(t), |
||||
}, |
||||
{ |
||||
name: "Don't deregister ourself", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
nodeNameOverride: "test-server-1", |
||||
}, |
||||
{ |
||||
name: "Don't do anything if the Workload is already gone", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
}, |
||||
{ |
||||
name: "Remove member regardless of Workload payload", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
existingWorkload: getTestWorkloadWithPort(t, 8301), |
||||
mutatedWorkload: getTestWorkload(t), |
||||
}, |
||||
{ |
||||
name: "Error reading Workload", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
workloadReadErr: true, |
||||
expErr: "error checking for existing Workload", |
||||
}, |
||||
{ |
||||
name: "Error deleting Workload", |
||||
member: getTestSerfMember(serf.StatusAlive), |
||||
workloadDeleteErr: true, |
||||
existingWorkload: getTestWorkloadWithPort(t, 8301), |
||||
mutatedWorkload: getTestWorkload(t), |
||||
expErr: "failed to delete Workload", |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
run(t, tt) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func setupReadExpectation( |
||||
t *testing.T, |
||||
mockClient *mockpbresource.ResourceServiceClient_Expecter, |
||||
expectedId *pbresource.ID, |
||||
existingResource *pbresource.Resource, |
||||
sendErr bool) { |
||||
|
||||
if sendErr { |
||||
mockClient.Read(mock.Anything, mock.Anything). |
||||
Return(nil, fakeWrappedErr). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.ReadRequest) |
||||
require.True(t, proto.Equal(expectedId, req.Id)) |
||||
}) |
||||
} else if existingResource != nil { |
||||
mockClient.Read(mock.Anything, mock.Anything). |
||||
Return(&pbresource.ReadResponse{ |
||||
Resource: existingResource, |
||||
}, nil). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.ReadRequest) |
||||
require.True(t, proto.Equal(expectedId, req.Id)) |
||||
}) |
||||
} else { |
||||
mockClient.Read(mock.Anything, mock.Anything). |
||||
Return(nil, status.Error(codes.NotFound, "not found")). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.ReadRequest) |
||||
require.True(t, proto.Equal(expectedId, req.Id)) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func setupWriteExpectation( |
||||
t *testing.T, |
||||
mockClient *mockpbresource.ResourceServiceClient_Expecter, |
||||
expectedResource *pbresource.Resource, |
||||
sendErr bool) { |
||||
|
||||
// If there is no expected resource, we take that to mean we don't expect any client writes.
|
||||
if expectedResource == nil { |
||||
return |
||||
} |
||||
|
||||
if sendErr { |
||||
mockClient.Write(mock.Anything, mock.Anything). |
||||
Return(nil, fakeWrappedErr). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.WriteRequest) |
||||
require.True(t, proto.Equal(expectedResource, req.Resource)) |
||||
}) |
||||
} else { |
||||
mockClient.Write(mock.Anything, mock.Anything). |
||||
Return(nil, nil). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.WriteRequest) |
||||
require.True(t, proto.Equal(expectedResource, req.Resource)) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func setupDeleteExpectation( |
||||
t *testing.T, |
||||
mockClient *mockpbresource.ResourceServiceClient_Expecter, |
||||
expectedResource *pbresource.Resource, |
||||
sendErr bool) { |
||||
|
||||
expectedId := expectedResource.GetId() |
||||
|
||||
if sendErr { |
||||
mockClient.Delete(mock.Anything, mock.Anything). |
||||
Return(nil, fakeWrappedErr). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.DeleteRequest) |
||||
require.True(t, proto.Equal(expectedId, req.Id)) |
||||
}) |
||||
} else { |
||||
mockClient.Delete(mock.Anything, mock.Anything). |
||||
Return(nil, nil). |
||||
Once(). |
||||
Run(func(args mock.Arguments) { |
||||
req := args.Get(1).(*pbresource.DeleteRequest) |
||||
require.True(t, proto.Equal(expectedId, req.Id)) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func getTestWorkload(t *testing.T) *pbresource.Resource { |
||||
return getTestWorkloadWithPort(t, 8300) |
||||
} |
||||
|
||||
func getTestWorkloadWithPort(t *testing.T, port int) *pbresource.Resource { |
||||
workload := &pbcatalog.Workload{ |
||||
Addresses: []*pbcatalog.WorkloadAddress{ |
||||
{Host: "127.0.0.1", Ports: []string{consulPortNameServer}}, |
||||
}, |
||||
Ports: map[string]*pbcatalog.WorkloadPort{ |
||||
consulPortNameServer: { |
||||
Port: uint32(port), |
||||
Protocol: pbcatalog.Protocol_PROTOCOL_TCP, |
||||
}, |
||||
}, |
||||
} |
||||
data, err := anypb.New(workload) |
||||
require.NoError(t, err) |
||||
|
||||
return &pbresource.Resource{ |
||||
Id: getTestWorkloadId(), |
||||
Data: data, |
||||
Metadata: map[string]string{ |
||||
"read_replica": "false", |
||||
"raft_version": "3", |
||||
"serf_protocol_current": "2", |
||||
"serf_protocol_min": "1", |
||||
"serf_protocol_max": "5", |
||||
"version": "1.18.0", |
||||
"grpc_port": "8502", |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func getTestWorkloadId() *pbresource.ID { |
||||
return &pbresource.ID{ |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
Type: pbcatalog.WorkloadType, |
||||
Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a", |
||||
} |
||||
} |
||||
|
||||
func getTestHealthStatus(t *testing.T, passing bool) *pbresource.Resource { |
||||
healthStatus := &pbcatalog.HealthStatus{ |
||||
Type: string(structs.SerfCheckID), |
||||
Description: structs.SerfCheckName, |
||||
} |
||||
|
||||
if passing { |
||||
healthStatus.Status = pbcatalog.Health_HEALTH_PASSING |
||||
healthStatus.Output = structs.SerfCheckAliveOutput |
||||
} else { |
||||
healthStatus.Status = pbcatalog.Health_HEALTH_CRITICAL |
||||
healthStatus.Output = structs.SerfCheckFailedOutput |
||||
} |
||||
|
||||
data, err := anypb.New(healthStatus) |
||||
require.NoError(t, err) |
||||
|
||||
return &pbresource.Resource{ |
||||
Id: getTestHealthstatusId(), |
||||
Data: data, |
||||
Owner: getTestWorkloadId(), |
||||
} |
||||
} |
||||
|
||||
func getTestHealthstatusId() *pbresource.ID { |
||||
return &pbresource.ID{ |
||||
Tenancy: resource.DefaultNamespacedTenancy(), |
||||
Type: pbcatalog.HealthStatusType, |
||||
Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a", |
||||
} |
||||
} |
||||
|
||||
func getTestSerfMember(status serf.MemberStatus) serf.Member { |
||||
return serf.Member{ |
||||
Name: "test-server-1", |
||||
Addr: net.ParseIP("127.0.0.1"), |
||||
Port: 8300, |
||||
// representative tags from a local dev deployment of ENT
|
||||
Tags: map[string]string{ |
||||
"vsn_min": "2", |
||||
"vsn": "2", |
||||
"acls": "1", |
||||
"ft_si": "1", |
||||
"raft_vsn": "3", |
||||
"grpc_port": "8502", |
||||
"wan_join_port": "8500", |
||||
"dc": "dc1", |
||||
"segment": "", |
||||
"id": "72af047d-1857-2493-969e-53614a70b25a", |
||||
"ft_admpart": "1", |
||||
"role": "consul", |
||||
"build": "1.18.0", |
||||
"ft_ns": "1", |
||||
"vsn_max": "3", |
||||
"bootstrap": "1", |
||||
"expect": "1", |
||||
"port": "8300", |
||||
}, |
||||
Status: status, |
||||
ProtocolMin: 1, |
||||
ProtocolMax: 5, |
||||
ProtocolCur: 2, |
||||
DelegateMin: 2, |
||||
DelegateMax: 5, |
||||
DelegateCur: 4, |
||||
} |
||||
} |
||||
|
||||
// Test_ResourceCmpOptions_GeneratedFieldInsensitive makes sure are protocmp options are working as expected.
|
||||
func Test_ResourceCmpOptions_GeneratedFieldInsensitive(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
res1 := getTestWorkload(t) |
||||
res2 := getTestWorkload(t) |
||||
|
||||
// Modify the generated fields
|
||||
res2.Id.Uid = "123456" |
||||
res2.Version = "789" |
||||
res2.Generation = "millenial" |
||||
res2.Status = map[string]*pbresource.Status{ |
||||
"foo": {ObservedGeneration: "124"}, |
||||
} |
||||
|
||||
require.True(t, cmp.Equal(res1, res2, resourceCmpOptions...)) |
||||
|
||||
res1.Metadata["foo"] = "bar" |
||||
|
||||
require.False(t, cmp.Equal(res1, res2, resourceCmpOptions...)) |
||||
} |
||||
|
||||
// Test gRPC Error Codes Conditions
|
||||
func Test_grpcNotFoundErr(t *testing.T) { |
||||
t.Parallel() |
||||
tests := []struct { |
||||
name string |
||||
err error |
||||
expected bool |
||||
}{ |
||||
{ |
||||
name: "Nil Error", |
||||
}, |
||||
{ |
||||
name: "Nonsense Error", |
||||
err: fmt.Errorf("boooooo!"), |
||||
}, |
||||
{ |
||||
name: "gRPC Permission Denied Error", |
||||
err: status.Error(codes.PermissionDenied, "permission denied is not NotFound"), |
||||
}, |
||||
{ |
||||
name: "gRPC NotFound Error", |
||||
err: status.Error(codes.NotFound, "bingo: not found"), |
||||
expected: true, |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
require.Equal(t, tt.expected, grpcNotFoundErr(tt.err)) |
||||
}) |
||||
} |
||||
} |
@ -1,15 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !consulent
|
||||
|
||||
package resource |
||||
|
||||
import "github.com/hashicorp/consul/proto-public/pbresource" |
||||
|
||||
func blockBuiltinsDeletion(rtype *pbresource.Type, id *pbresource.ID) error { |
||||
if err := blockDefaultNamespaceDeletion(rtype, id); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
@ -1,15 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !consulent
|
||||
|
||||
package catalog |
||||
|
||||
import ( |
||||
"github.com/hashicorp/consul/acl" |
||||
"github.com/hashicorp/consul/proto-public/pbresource" |
||||
) |
||||
|
||||
func GetEnterpriseMetaFromResourceID(id *pbresource.ID) *acl.EnterpriseMeta { |
||||
return acl.DefaultEnterpriseMeta() |
||||
} |
@ -1,3 +0,0 @@
|
||||
identity "{{.Name}}" { |
||||
policy = "write" |
||||
} |
@ -1,13 +0,0 @@
|
||||
{ |
||||
"type": "object", |
||||
"properties": { |
||||
"name": { "type": "string", "$ref": "#/definitions/min-length-one" } |
||||
}, |
||||
"required": ["name"], |
||||
"definitions": { |
||||
"min-length-one": { |
||||
"type": "string", |
||||
"minLength": 1 |
||||
} |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -1,135 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package proxystateconverter |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/hashicorp/go-hclog" |
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/hashicorp/consul/agent/xds/configfetcher" |
||||
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" |
||||
"github.com/hashicorp/consul/internal/resource" |
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" |
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate" |
||||
) |
||||
|
||||
// Converter converts a single snapshot into a ProxyState.
|
||||
type Converter struct { |
||||
Logger hclog.Logger |
||||
CfgFetcher configfetcher.ConfigFetcher |
||||
proxyState *proxytracker.ProxyState |
||||
} |
||||
|
||||
func NewConverter( |
||||
logger hclog.Logger, |
||||
cfgFetcher configfetcher.ConfigFetcher, |
||||
) *Converter { |
||||
return &Converter{ |
||||
Logger: logger, |
||||
CfgFetcher: cfgFetcher, |
||||
proxyState: &proxytracker.ProxyState{ |
||||
ProxyState: &pbmesh.ProxyState{ |
||||
Listeners: make([]*pbproxystate.Listener, 0), |
||||
Clusters: make(map[string]*pbproxystate.Cluster), |
||||
Routes: make(map[string]*pbproxystate.Route), |
||||
Endpoints: make(map[string]*pbproxystate.Endpoints), |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*proxytracker.ProxyState, error) { |
||||
err := g.resourcesFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to generate FullProxyState: %v", err) |
||||
} |
||||
|
||||
return g.proxyState, nil |
||||
} |
||||
|
||||
func (g *Converter) resourcesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) error { |
||||
err := g.tlsConfigFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = g.listenersFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = g.endpointsFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = g.clustersFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = g.routesFromSnapshot(cfgSnap) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
//g.secretsFromSnapshot(cfgSnap)
|
||||
return nil |
||||
} |
||||
|
||||
func (g *Converter) tlsConfigFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) error { |
||||
proxyStateTLS := &pbproxystate.TLS{} |
||||
g.proxyState.TrustBundles = make(map[string]*pbproxystate.TrustBundle) |
||||
g.proxyState.LeafCertificates = make(map[string]*pbproxystate.LeafCertificate) |
||||
|
||||
// Set the TLS in the top level proxyState
|
||||
g.proxyState.Tls = proxyStateTLS |
||||
|
||||
// Add local trust bundle
|
||||
g.proxyState.TrustBundles[resource.DefaultPeerName] = &pbproxystate.TrustBundle{ |
||||
TrustDomain: cfgSnap.Roots.TrustDomain, |
||||
Roots: []string{cfgSnap.RootPEMs()}, |
||||
} |
||||
|
||||
// Add peered trust bundles for remote peers that will dial this proxy.
|
||||
for _, peeringTrustBundle := range cfgSnap.PeeringTrustBundles() { |
||||
g.proxyState.TrustBundles[peeringTrustBundle.PeerName] = &pbproxystate.TrustBundle{ |
||||
TrustDomain: peeringTrustBundle.GetTrustDomain(), |
||||
Roots: peeringTrustBundle.RootPEMs, |
||||
} |
||||
} |
||||
|
||||
// Add upstream peer trust bundles for dialing upstreams in remote peers.
|
||||
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() |
||||
if err != nil { |
||||
if !(cfgSnap.Kind == structs.ServiceKindMeshGateway || cfgSnap.Kind == structs.ServiceKindTerminatingGateway) { |
||||
return err |
||||
} |
||||
} |
||||
if upstreamsSnapshot != nil { |
||||
upstreamsSnapshot.UpstreamPeerTrustBundles.ForEachKeyE(func(k proxycfg.PeerName) error { |
||||
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(k) |
||||
if ok { |
||||
g.proxyState.TrustBundles[k] = &pbproxystate.TrustBundle{ |
||||
TrustDomain: tbs.TrustDomain, |
||||
Roots: tbs.RootPEMs, |
||||
} |
||||
} |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
if cfgSnap.MeshConfigTLSOutgoing() != nil { |
||||
proxyStateTLS.OutboundTlsParameters = makeTLSParametersFromTLSConfig(cfgSnap.MeshConfigTLSOutgoing().TLSMinVersion, |
||||
cfgSnap.MeshConfigTLSOutgoing().TLSMaxVersion, cfgSnap.MeshConfigTLSOutgoing().CipherSuites) |
||||
} |
||||
|
||||
if cfgSnap.MeshConfigTLSIncoming() != nil { |
||||
proxyStateTLS.InboundTlsParameters = makeTLSParametersFromTLSConfig(cfgSnap.MeshConfigTLSIncoming().TLSMinVersion, |
||||
cfgSnap.MeshConfigTLSIncoming().TLSMaxVersion, cfgSnap.MeshConfigTLSIncoming().CipherSuites) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -1,674 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package proxystateconverter |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
|
||||
"github.com/hashicorp/consul/agent/connect" |
||||
"github.com/hashicorp/consul/agent/proxycfg" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/hashicorp/consul/agent/xds/response" |
||||
"github.com/hashicorp/consul/api" |
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate" |
||||
"github.com/hashicorp/go-bexpr" |
||||
|
||||
"google.golang.org/protobuf/types/known/wrapperspb" |
||||
) |
||||
|
||||
func makeLbEndpoint(addr string, port int, health pbproxystate.HealthStatus, weight int) *pbproxystate.Endpoint { |
||||
ep := &pbproxystate.Endpoint{ |
||||
Address: &pbproxystate.Endpoint_HostPort{ |
||||
HostPort: &pbproxystate.HostPortAddress{ |
||||
Host: addr, |
||||
Port: uint32(port), |
||||
}, |
||||
}, |
||||
} |
||||
ep.HealthStatus = health |
||||
ep.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: uint32(weight)} |
||||
return ep |
||||
} |
||||
|
||||
// endpointsFromSnapshot returns the mesh API representation of the "routes" in the snapshot.
|
||||
func (s *Converter) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) error { |
||||
|
||||
if cfgSnap == nil { |
||||
return errors.New("nil config given") |
||||
} |
||||
|
||||
switch cfgSnap.Kind { |
||||
case structs.ServiceKindConnectProxy: |
||||
return s.endpointsFromSnapshotConnectProxy(cfgSnap) |
||||
//case structs.ServiceKindTerminatingGateway:
|
||||
// return s.endpointsFromSnapshotTerminatingGateway(cfgSnap)
|
||||
//case structs.ServiceKindMeshGateway:
|
||||
// return s.endpointsFromSnapshotMeshGateway(cfgSnap)
|
||||
//case structs.ServiceKindIngressGateway:
|
||||
// return s.endpointsFromSnapshotIngressGateway(cfgSnap)
|
||||
//case structs.ServiceKindAPIGateway:
|
||||
// return s.endpointsFromSnapshotAPIGateway(cfgSnap)
|
||||
default: |
||||
return fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) |
||||
} |
||||
} |
||||
|
||||
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
|
||||
// (upstream instances) in the snapshot.
|
||||
func (s *Converter) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) error { |
||||
eps := make(map[string]*pbproxystate.Endpoints) |
||||
|
||||
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
|
||||
// so that the sets of endpoints generated matches the sets of clusters.
|
||||
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { |
||||
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta) |
||||
if skip { |
||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
||||
continue |
||||
} |
||||
|
||||
var upstreamConfigMap map[string]interface{} |
||||
if upstream != nil { |
||||
upstreamConfigMap = upstream.Config |
||||
} |
||||
|
||||
es, err := s.endpointsFromDiscoveryChain( |
||||
uid, |
||||
chain, |
||||
cfgSnap, |
||||
cfgSnap.Locality, |
||||
upstreamConfigMap, |
||||
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid], |
||||
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid], |
||||
false, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for clusterName, endpoints := range es { |
||||
eps[clusterName] = &pbproxystate.Endpoints{ |
||||
Endpoints: endpoints, |
||||
} |
||||
|
||||
} |
||||
} |
||||
|
||||
// NOTE: Any time we skip an upstream below we MUST also skip that same
|
||||
// upstream in clusters.go so that the sets of endpoints generated matches
|
||||
// the sets of clusters.
|
||||
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { |
||||
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta) |
||||
if skip { |
||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
||||
continue |
||||
} |
||||
|
||||
tbs, ok := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer) |
||||
if !ok { |
||||
// this should never happen since we loop through upstreams with
|
||||
// set trust bundles
|
||||
return fmt.Errorf("trust bundle not ready for peer %s", uid.Peer) |
||||
} |
||||
|
||||
clusterName := generatePeeredClusterName(uid, tbs) |
||||
|
||||
mgwMode := structs.MeshGatewayModeDefault |
||||
if upstream != nil { |
||||
mgwMode = upstream.MeshGateway.Mode |
||||
} |
||||
peerServiceEndpoints, err := s.makeEndpointsForPeerService(cfgSnap, uid, mgwMode) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if peerServiceEndpoints != nil { |
||||
pbEndpoints := &pbproxystate.Endpoints{ |
||||
Endpoints: peerServiceEndpoints, |
||||
} |
||||
|
||||
eps[clusterName] = pbEndpoints |
||||
} |
||||
} |
||||
|
||||
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
|
||||
for _, u := range cfgSnap.Proxy.Upstreams { |
||||
if u.DestinationType != structs.UpstreamDestTypePreparedQuery { |
||||
continue |
||||
} |
||||
uid := proxycfg.NewUpstreamID(&u) |
||||
|
||||
dc := u.Datacenter |
||||
if dc == "" { |
||||
dc = cfgSnap.Datacenter |
||||
} |
||||
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) |
||||
|
||||
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] |
||||
if ok { |
||||
epts := makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: endpoints}, |
||||
}, |
||||
cfgSnap.Locality, |
||||
) |
||||
pbEndpoints := &pbproxystate.Endpoints{ |
||||
Endpoints: epts, |
||||
} |
||||
|
||||
eps[clusterName] = pbEndpoints |
||||
} |
||||
} |
||||
|
||||
// Loop over potential destinations in the mesh, then grab the gateway nodes associated with each
|
||||
cfgSnap.ConnectProxy.DestinationsUpstream.ForEachKey(func(uid proxycfg.UpstreamID) bool { |
||||
svcConfig, ok := cfgSnap.ConnectProxy.DestinationsUpstream.Get(uid) |
||||
if !ok || svcConfig.Destination == nil { |
||||
return true |
||||
} |
||||
|
||||
for _, address := range svcConfig.Destination.Addresses { |
||||
clusterName := clusterNameForDestination(cfgSnap, uid.Name, address, uid.NamespaceOrDefault(), uid.PartitionOrDefault()) |
||||
|
||||
endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid) |
||||
if ok { |
||||
epts := makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: endpoints}, |
||||
}, |
||||
proxycfg.GatewayKey{ /*empty so it never matches*/ }, |
||||
) |
||||
pbEndpoints := &pbproxystate.Endpoints{ |
||||
Endpoints: epts, |
||||
} |
||||
eps[clusterName] = pbEndpoints |
||||
} |
||||
} |
||||
|
||||
return true |
||||
}) |
||||
|
||||
s.proxyState.Endpoints = eps |
||||
return nil |
||||
} |
||||
|
||||
func (s *Converter) makeEndpointsForPeerService( |
||||
cfgSnap *proxycfg.ConfigSnapshot, |
||||
uid proxycfg.UpstreamID, |
||||
upstreamGatewayMode structs.MeshGatewayMode, |
||||
) ([]*pbproxystate.Endpoint, error) { |
||||
var eps []*pbproxystate.Endpoint |
||||
|
||||
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() |
||||
if err != nil { |
||||
return eps, err |
||||
} |
||||
|
||||
if upstreamGatewayMode == structs.MeshGatewayModeNone { |
||||
s.Logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) |
||||
} |
||||
|
||||
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
||||
// from the gateway endpoints instead of those of the upstreams.
|
||||
if upstreamGatewayMode == structs.MeshGatewayModeLocal { |
||||
localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) |
||||
if !ok { |
||||
// local GW is not ready; return early
|
||||
return eps, nil |
||||
} |
||||
eps = makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: localGw}, |
||||
}, |
||||
cfgSnap.Locality, |
||||
) |
||||
return eps, nil |
||||
} |
||||
|
||||
// Also skip peer instances with a hostname as their address. EDS
|
||||
// cannot resolve hostnames, so we provide them through CDS instead.
|
||||
if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid]; ok { |
||||
return eps, nil |
||||
} |
||||
|
||||
endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid) |
||||
if !ok { |
||||
return nil, nil |
||||
} |
||||
eps = makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: endpoints}, |
||||
}, |
||||
proxycfg.GatewayKey{ /*empty so it never matches*/ }, |
||||
) |
||||
return eps, nil |
||||
} |
||||
|
||||
func (s *Converter) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) { |
||||
// locally execute the subsets filter
|
||||
if subset.Filter != "" { |
||||
filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
raw, err := filter.Execute(endpoints) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return raw.(structs.CheckServiceNodes), nil |
||||
} |
||||
return endpoints, nil |
||||
} |
||||
|
||||
// TODO(proxystate): Terminating Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func endpointsFromSnapshotTerminatingGateway
|
||||
|
||||
// TODO(proxystate): Mesh Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func endpointsFromSnapshotMeshGateway
|
||||
|
||||
// TODO(proxystate): Cluster Peering will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func makeEndpointsForOutgoingPeeredServices
|
||||
|
||||
// TODO(proxystate): Mesh Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func endpointsFromServicesAndResolvers
|
||||
|
||||
// TODO(proxystate): Mesh Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func makePeerServerEndpointsForMeshGateway
|
||||
|
||||
// TODO(proxystate): Ingress Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func endpointsFromSnapshotIngressGateway
|
||||
|
||||
// TODO(proxystate): API Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func endpointsFromSnapshotAPIGateway
|
||||
|
||||
// used in clusters.go
|
||||
func makeHostPortEndpoint(host string, port int) *pbproxystate.Endpoint { |
||||
if port >= 0 && port <= 65535 { |
||||
return &pbproxystate.Endpoint{ |
||||
Address: &pbproxystate.Endpoint_HostPort{ |
||||
HostPort: &pbproxystate.HostPortAddress{ |
||||
Host: host, |
||||
Port: uint32(port), |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func makeUnixSocketEndpoint(path string) *pbproxystate.Endpoint { |
||||
return &pbproxystate.Endpoint{ |
||||
Address: &pbproxystate.Endpoint_UnixSocket{ |
||||
UnixSocket: &pbproxystate.UnixSocketAddress{ |
||||
Path: path, |
||||
// envoy's mode is particular to a pipe address and is uint32.
|
||||
// it also says "The mode for the Pipe. Not applicable for abstract sockets."
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/core/v3/address.proto#config-core-v3-pipe
|
||||
Mode: "0", |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (s *Converter) makeUpstreamLoadAssignmentEndpointForPeerService( |
||||
cfgSnap *proxycfg.ConfigSnapshot, |
||||
uid proxycfg.UpstreamID, |
||||
upstreamGatewayMode structs.MeshGatewayMode, |
||||
) ([]*pbproxystate.Endpoint, error) { |
||||
var eps []*pbproxystate.Endpoint |
||||
|
||||
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() |
||||
if err != nil { |
||||
return eps, err |
||||
} |
||||
|
||||
if upstreamGatewayMode == structs.MeshGatewayModeNone { |
||||
s.Logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) |
||||
} |
||||
|
||||
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
||||
// from the gateway endpoints instead of those of the upstreams.
|
||||
if upstreamGatewayMode == structs.MeshGatewayModeLocal { |
||||
localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) |
||||
if !ok { |
||||
// local GW is not ready; return early
|
||||
return eps, nil |
||||
} |
||||
eps = makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: localGw}, |
||||
}, |
||||
cfgSnap.Locality, |
||||
) |
||||
return eps, nil |
||||
} |
||||
|
||||
// Also skip peer instances with a hostname as their address. EDS
|
||||
// cannot resolve hostnames, so we provide them through CDS instead.
|
||||
if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid]; ok { |
||||
return eps, nil |
||||
} |
||||
|
||||
endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid) |
||||
if !ok { |
||||
return nil, nil |
||||
} |
||||
eps = makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
nil, |
||||
[]loadAssignmentEndpointGroup{ |
||||
{Endpoints: endpoints}, |
||||
}, |
||||
proxycfg.GatewayKey{ /*empty so it never matches*/ }, |
||||
) |
||||
return eps, nil |
||||
} |
||||
|
||||
func (s *Converter) endpointsFromDiscoveryChain( |
||||
uid proxycfg.UpstreamID, |
||||
chain *structs.CompiledDiscoveryChain, |
||||
cfgSnap *proxycfg.ConfigSnapshot, |
||||
gatewayKey proxycfg.GatewayKey, |
||||
upstreamConfigMap map[string]interface{}, |
||||
upstreamEndpoints map[string]structs.CheckServiceNodes, |
||||
gatewayEndpoints map[string]structs.CheckServiceNodes, |
||||
forMeshGateway bool, |
||||
) (map[string][]*pbproxystate.Endpoint, error) { |
||||
if chain == nil { |
||||
if forMeshGateway { |
||||
return nil, fmt.Errorf("missing discovery chain for %s", uid) |
||||
} |
||||
return nil, nil |
||||
} |
||||
|
||||
if upstreamConfigMap == nil { |
||||
upstreamConfigMap = make(map[string]interface{}) // TODO:needed?
|
||||
} |
||||
|
||||
clusterEndpoints := make(map[string][]*pbproxystate.Endpoint) |
||||
|
||||
// TODO(proxystate): escape hatches will be implemented in the future
|
||||
//var escapeHatchCluster *pbproxystate.Cluster
|
||||
//if !forMeshGateway {
|
||||
|
||||
//cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
|
||||
//if err != nil {
|
||||
// // Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// // default config if there is an error so it's safe to continue.
|
||||
// s.Logger.Warn("failed to parse", "upstream", uid,
|
||||
// "error", err)
|
||||
//}
|
||||
|
||||
//if cfg.EnvoyClusterJSON != "" {
|
||||
// if chain.Default {
|
||||
// // If you haven't done anything to setup the discovery chain, then
|
||||
// // you can use the envoy_cluster_json escape hatch.
|
||||
// escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
|
||||
// if err != nil {
|
||||
// return ce, nil
|
||||
// }
|
||||
// } else {
|
||||
// s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
|
||||
// "discovery chain", chain.ServiceName, "upstream", uid,
|
||||
// "envoy_cluster_json", chain.ServiceName)
|
||||
// }
|
||||
//}
|
||||
//}
|
||||
|
||||
mgwMode := structs.MeshGatewayModeDefault |
||||
if upstream, _ := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta); upstream != nil { |
||||
mgwMode = upstream.MeshGateway.Mode |
||||
} |
||||
|
||||
// Find all resolver nodes.
|
||||
for _, node := range chain.Nodes { |
||||
switch { |
||||
case node == nil: |
||||
return nil, fmt.Errorf("impossible to process a nil node") |
||||
case node.Type != structs.DiscoveryGraphNodeTypeResolver: |
||||
continue |
||||
case node.Resolver == nil: |
||||
return nil, fmt.Errorf("impossible to process a non-resolver node") |
||||
} |
||||
rawUpstreamConfig, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
upstreamConfig := finalizeUpstreamConfig(rawUpstreamConfig, chain, node.Resolver.ConnectTimeout) |
||||
|
||||
mappedTargets, err := s.mapDiscoChainTargets(cfgSnap, chain, node, upstreamConfig, forMeshGateway) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
targetGroups, err := mappedTargets.groupedTargets() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
for _, groupedTarget := range targetGroups { |
||||
clusterName := groupedTarget.ClusterName |
||||
// TODO(proxystate): escape hatches will be implemented in the future
|
||||
//if escapeHatchCluster != nil {
|
||||
// clusterName = escapeHatchCluster.Name
|
||||
//}
|
||||
switch len(groupedTarget.Targets) { |
||||
case 0: |
||||
continue |
||||
case 1: |
||||
// We expect one target so this passes through to continue setting the load assignment up.
|
||||
default: |
||||
return nil, fmt.Errorf("cannot have more than one target") |
||||
} |
||||
ti := groupedTarget.Targets[0] |
||||
s.Logger.Debug("generating endpoints for", "cluster", clusterName, "targetID", ti.TargetID) |
||||
targetUID := proxycfg.NewUpstreamIDFromTargetID(ti.TargetID) |
||||
if targetUID.Peer != "" { |
||||
peerServiceEndpoints, err := s.makeEndpointsForPeerService(cfgSnap, targetUID, mgwMode) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if peerServiceEndpoints != nil { |
||||
clusterEndpoints[clusterName] = peerServiceEndpoints |
||||
} |
||||
continue |
||||
} |
||||
|
||||
endpointGroup, valid := makeLoadAssignmentEndpointGroup( |
||||
chain.Targets, |
||||
upstreamEndpoints, |
||||
gatewayEndpoints, |
||||
ti.TargetID, |
||||
gatewayKey, |
||||
forMeshGateway, |
||||
) |
||||
if !valid { |
||||
continue // skip the cluster if we're still populating the snapshot
|
||||
} |
||||
|
||||
epts := makeEndpointsForLoadAssignment( |
||||
cfgSnap, |
||||
ti.PrioritizeByLocality, |
||||
[]loadAssignmentEndpointGroup{endpointGroup}, |
||||
gatewayKey, |
||||
) |
||||
clusterEndpoints[clusterName] = epts |
||||
} |
||||
} |
||||
|
||||
return clusterEndpoints, nil |
||||
} |
||||
|
||||
// TODO(proxystate): Mesh Gateway will be added in the future.
|
||||
// Functions to add from agent/xds/endpoints.go:
|
||||
// func makeExportedUpstreamEndpointsForMeshGateway
|
||||
|
||||
type loadAssignmentEndpointGroup struct { |
||||
Endpoints structs.CheckServiceNodes |
||||
OnlyPassing bool |
||||
OverrideHealth pbproxystate.HealthStatus |
||||
} |
||||
|
||||
func makeEndpointsForLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, |
||||
policy *structs.DiscoveryPrioritizeByLocality, |
||||
endpointGroups []loadAssignmentEndpointGroup, |
||||
localKey proxycfg.GatewayKey) []*pbproxystate.Endpoint { |
||||
pbEndpoints := make([]*pbproxystate.Endpoint, 0, len(endpointGroups)) |
||||
|
||||
// TODO(proxystate): this will be added with property overrides having golden files with this
|
||||
//if len(endpointGroups) > 1 {
|
||||
// cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{
|
||||
// // We choose such a large value here that the failover math should
|
||||
// // in effect not happen until zero instances are healthy.
|
||||
// OverprovisioningFactor: response.MakeUint32Value(100000),
|
||||
// }
|
||||
//}
|
||||
|
||||
var priority uint32 |
||||
|
||||
for _, endpointGroup := range endpointGroups { |
||||
endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints) |
||||
|
||||
if err != nil { |
||||
continue |
||||
} |
||||
|
||||
for _, endpoints := range endpointsByLocality { |
||||
for _, ep := range endpoints { |
||||
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
||||
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) |
||||
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) |
||||
|
||||
if endpointGroup.OverrideHealth != pbproxystate.HealthStatus_HEALTH_STATUS_UNKNOWN { |
||||
healthStatus = endpointGroup.OverrideHealth |
||||
} |
||||
|
||||
endpoint := makeHostPortEndpoint(addr, port) |
||||
endpoint.HealthStatus = healthStatus |
||||
endpoint.LoadBalancingWeight = response.MakeUint32Value(weight) |
||||
|
||||
pbEndpoints = append(pbEndpoints, endpoint) |
||||
} |
||||
|
||||
// TODO(proxystate): what do we do about priority downstream?
|
||||
//cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
|
||||
// Priority: priority,
|
||||
// LbEndpoints: es,
|
||||
//})
|
||||
|
||||
priority++ |
||||
} |
||||
} |
||||
|
||||
return pbEndpoints |
||||
} |
||||
|
||||
func makeLoadAssignmentEndpointGroup( |
||||
targets map[string]*structs.DiscoveryTarget, |
||||
targetHealth map[string]structs.CheckServiceNodes, |
||||
gatewayHealth map[string]structs.CheckServiceNodes, |
||||
targetID string, |
||||
localKey proxycfg.GatewayKey, |
||||
forMeshGateway bool, |
||||
) (loadAssignmentEndpointGroup, bool) { |
||||
realEndpoints, ok := targetHealth[targetID] |
||||
if !ok { |
||||
// skip the cluster if we're still populating the snapshot
|
||||
return loadAssignmentEndpointGroup{}, false |
||||
} |
||||
target := targets[targetID] |
||||
|
||||
var gatewayKey proxycfg.GatewayKey |
||||
|
||||
switch target.MeshGateway.Mode { |
||||
case structs.MeshGatewayModeRemote: |
||||
gatewayKey.Datacenter = target.Datacenter |
||||
gatewayKey.Partition = target.Partition |
||||
case structs.MeshGatewayModeLocal: |
||||
gatewayKey = localKey |
||||
} |
||||
|
||||
if forMeshGateway || gatewayKey.IsEmpty() || localKey.Matches(target.Datacenter, target.Partition) { |
||||
// Gateways are not needed if the request isn't for a remote DC or partition.
|
||||
return loadAssignmentEndpointGroup{ |
||||
Endpoints: realEndpoints, |
||||
OnlyPassing: target.Subset.OnlyPassing, |
||||
}, true |
||||
} |
||||
|
||||
// If using a mesh gateway we need to pull those endpoints instead.
|
||||
gatewayEndpoints, ok := gatewayHealth[gatewayKey.String()] |
||||
if !ok { |
||||
// skip the cluster if we're still populating the snapshot
|
||||
return loadAssignmentEndpointGroup{}, false |
||||
} |
||||
|
||||
// But we will use the health from the actual backend service.
|
||||
overallHealth := pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY |
||||
for _, ep := range realEndpoints { |
||||
health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing) |
||||
if health == pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY { |
||||
overallHealth = pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY |
||||
break |
||||
} |
||||
} |
||||
|
||||
return loadAssignmentEndpointGroup{ |
||||
Endpoints: gatewayEndpoints, |
||||
OverrideHealth: overallHealth, |
||||
}, true |
||||
} |
||||
|
||||
func calculateEndpointHealthAndWeight( |
||||
ep structs.CheckServiceNode, |
||||
onlyPassing bool, |
||||
) (pbproxystate.HealthStatus, int) { |
||||
healthStatus := pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY |
||||
weight := 1 |
||||
if ep.Service.Weights != nil { |
||||
weight = ep.Service.Weights.Passing |
||||
} |
||||
|
||||
for _, chk := range ep.Checks { |
||||
if chk.Status == api.HealthCritical { |
||||
healthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY |
||||
} |
||||
if onlyPassing && chk.Status != api.HealthPassing { |
||||
healthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY |
||||
} |
||||
if chk.Status == api.HealthWarning && ep.Service.Weights != nil { |
||||
weight = ep.Service.Weights.Warning |
||||
} |
||||
} |
||||
// Make weights fit Envoy's limits. A zero weight means that either Warning
|
||||
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
|
||||
// this instance unhealthy and should not be sent traffic.
|
||||
if weight < 1 { |
||||
healthStatus = pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY |
||||
weight = 1 |
||||
} |
||||
if weight > 128 { |
||||
weight = 128 |
||||
} |
||||
return healthStatus, weight |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue