mirror of https://github.com/hashicorp/consul
360 lines
12 KiB
Go
360 lines
12 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strings"
|
|
"sync/atomic"
|
|
|
|
"golang.org/x/exp/slices"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/agent/config"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
)
|
|
|
|
// v2DataFetcherDynamicConfig is used to store the dynamic configuration of the V2 data fetcher.
|
|
type v2DataFetcherDynamicConfig struct {
|
|
onlyPassing bool
|
|
}
|
|
|
|
// V2DataFetcher is used to fetch data from the V2 catalog.
|
|
type V2DataFetcher struct {
|
|
client pbresource.ResourceServiceClient
|
|
logger hclog.Logger
|
|
|
|
// Requests inherit the partition of the agent unless otherwise specified.
|
|
defaultPartition string
|
|
|
|
dynamicConfig atomic.Value
|
|
}
|
|
|
|
// NewV2DataFetcher creates a new V2 data fetcher.
|
|
func NewV2DataFetcher(config *config.RuntimeConfig, client pbresource.ResourceServiceClient, logger hclog.Logger) *V2DataFetcher {
|
|
f := &V2DataFetcher{
|
|
client: client,
|
|
logger: logger,
|
|
defaultPartition: config.PartitionOrDefault(),
|
|
}
|
|
f.LoadConfig(config)
|
|
return f
|
|
}
|
|
|
|
// LoadConfig loads the configuration for the V2 data fetcher.
|
|
func (f *V2DataFetcher) LoadConfig(config *config.RuntimeConfig) {
|
|
dynamicConfig := &v2DataFetcherDynamicConfig{
|
|
onlyPassing: config.DNSOnlyPassing,
|
|
}
|
|
f.dynamicConfig.Store(dynamicConfig)
|
|
}
|
|
|
|
// FetchNodes fetches A/AAAA/CNAME
|
|
func (f *V2DataFetcher) FetchNodes(ctx Context, req *QueryPayload) ([]*Result, error) {
|
|
// TODO (v2-dns): NET-6623 - Implement FetchNodes
|
|
// Make sure that we validate that namespace is not provided here
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
// FetchEndpoints fetches records for A/AAAA/CNAME or SRV requests for services
|
|
func (f *V2DataFetcher) FetchEndpoints(reqContext Context, req *QueryPayload, lookupType LookupType) ([]*Result, error) {
|
|
if lookupType != LookupTypeService {
|
|
return nil, ErrNotSupported
|
|
}
|
|
|
|
configCtx := f.dynamicConfig.Load().(*v2DataFetcherDynamicConfig)
|
|
|
|
serviceEndpoints := pbcatalog.ServiceEndpoints{}
|
|
serviceEndpointsResource, err := f.fetchResource(reqContext, *req, pbcatalog.ServiceEndpointsType, &serviceEndpoints)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
f.logger.Trace("shuffling endpoints", "name", req.Name, "endpoints", len(serviceEndpoints.Endpoints))
|
|
|
|
// Shuffle the endpoints slice
|
|
shuffleFunc := func(i, j int) {
|
|
serviceEndpoints.Endpoints[i], serviceEndpoints.Endpoints[j] = serviceEndpoints.Endpoints[j], serviceEndpoints.Endpoints[i]
|
|
}
|
|
rand.Shuffle(len(serviceEndpoints.Endpoints), shuffleFunc)
|
|
|
|
// Convert the service endpoints to results up to the limit
|
|
limit := req.Limit
|
|
if len(serviceEndpoints.Endpoints) < limit || limit == 0 {
|
|
limit = len(serviceEndpoints.Endpoints)
|
|
}
|
|
|
|
results := make([]*Result, 0, limit)
|
|
for _, endpoint := range serviceEndpoints.Endpoints[:limit] {
|
|
|
|
// First we check the endpoint first to make sure that the requested port is matched from the service.
|
|
// We error here because we expect all endpoints to have the same ports as the service.
|
|
ports := getResultPorts(req, endpoint.Ports) //assuming the logic changed in getResultPorts
|
|
if len(ports) == 0 {
|
|
f.logger.Debug("could not find matching port in endpoint", "name", req.Name, "port", req.PortName)
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
address, err := f.addressFromWorkloadAddresses(endpoint.Addresses, req.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
weight, ok := getEndpointWeight(endpoint, configCtx)
|
|
if !ok {
|
|
f.logger.Debug("endpoint filtered out because of health status", "name", req.Name, "endpoint", endpoint.GetTargetRef().GetName())
|
|
continue
|
|
}
|
|
|
|
result := &Result{
|
|
Node: &Location{
|
|
Address: address,
|
|
Name: endpoint.GetTargetRef().GetName(),
|
|
},
|
|
Type: ResultTypeWorkload,
|
|
Tenancy: ResultTenancy{
|
|
Namespace: serviceEndpointsResource.GetId().GetTenancy().GetNamespace(),
|
|
Partition: serviceEndpointsResource.GetId().GetTenancy().GetPartition(),
|
|
},
|
|
DNS: DNSConfig{
|
|
Weight: weight,
|
|
},
|
|
Ports: ports,
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// FetchVirtualIP fetches A/AAAA records for virtual IPs
|
|
func (f *V2DataFetcher) FetchVirtualIP(ctx Context, req *QueryPayload) (*Result, error) {
|
|
// TODO (v2-dns): NET-6624 - Implement FetchVirtualIP
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
// FetchRecordsByIp is used for PTR requests to look up a service/node from an IP.
|
|
func (f *V2DataFetcher) FetchRecordsByIp(ctx Context, ip net.IP) ([]*Result, error) {
|
|
// TODO (v2-dns): NET-6795 - Implement FetchRecordsByIp
|
|
// Validate non-nil IP
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
// FetchWorkload is used to fetch a single workload from the V2 catalog.
|
|
// V2-only.
|
|
func (f *V2DataFetcher) FetchWorkload(reqContext Context, req *QueryPayload) (*Result, error) {
|
|
workload := pbcatalog.Workload{}
|
|
resourceObj, err := f.fetchResource(reqContext, *req, pbcatalog.WorkloadType, &workload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// First we check the endpoint first to make sure that the requested port is matched from the service.
|
|
// We error here because we expect all endpoints to have the same ports as the service.
|
|
ports := getResultPorts(req, workload.Ports) //assuming the logic changed in getResultPorts
|
|
if ports == nil || len(ports) == 0 {
|
|
f.logger.Debug("could not find matching port in endpoint", "name", req.Name, "port", req.PortName)
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
address, err := f.addressFromWorkloadAddresses(workload.Addresses, req.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tenancy := resourceObj.GetId().GetTenancy()
|
|
result := &Result{
|
|
Node: &Location{
|
|
Address: address,
|
|
Name: resourceObj.GetId().GetName(),
|
|
},
|
|
Type: ResultTypeWorkload,
|
|
Tenancy: ResultTenancy{
|
|
Namespace: tenancy.GetNamespace(),
|
|
Partition: tenancy.GetPartition(),
|
|
},
|
|
Ports: ports,
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// FetchPreparedQuery is used to fetch a prepared query from the V2 catalog.
|
|
// Deprecated in V2.
|
|
func (f *V2DataFetcher) FetchPreparedQuery(ctx Context, req *QueryPayload) ([]*Result, error) {
|
|
return nil, ErrNotSupported
|
|
}
|
|
|
|
func (f *V2DataFetcher) NormalizeRequest(req *QueryPayload) {
|
|
// If we do not have an explicit partition in the request, we use the agent's
|
|
if req.Tenancy.Partition == "" {
|
|
req.Tenancy.Partition = f.defaultPartition
|
|
}
|
|
}
|
|
|
|
// ValidateRequest throws an error is any of the deprecated V1 input fields are used in a QueryByName for this data fetcher.
|
|
func (f *V2DataFetcher) ValidateRequest(_ Context, req *QueryPayload) error {
|
|
if req.Tag != "" {
|
|
return ErrNotSupported
|
|
}
|
|
if req.SourceIP != nil {
|
|
return ErrNotSupported
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// fetchResource is used to read a single resource from the V2 catalog and cast into a concrete type.
|
|
func (f *V2DataFetcher) fetchResource(reqContext Context, req QueryPayload, kind *pbresource.Type, payload proto.Message) (*pbresource.Resource, error) {
|
|
// Query the resource service for the ServiceEndpoints by name and tenancy
|
|
resourceReq := pbresource.ReadRequest{
|
|
Id: &pbresource.ID{
|
|
Name: req.Name,
|
|
Type: kind,
|
|
Tenancy: queryTenancyToResourceTenancy(req.Tenancy),
|
|
},
|
|
}
|
|
|
|
f.logger.Trace("fetching "+kind.String(), "name", req.Name)
|
|
resourceCtx := metadata.AppendToOutgoingContext(context.Background(), "x-consul-token", reqContext.Token)
|
|
|
|
// If the service is not found, return nil and an error equivalent to NXDOMAIN
|
|
response, err := f.client.Read(resourceCtx, &resourceReq)
|
|
switch {
|
|
case grpcNotFoundErr(err):
|
|
f.logger.Debug(kind.String()+" not found", "name", req.Name)
|
|
return nil, ErrNotFound
|
|
case err != nil:
|
|
f.logger.Error("error fetching "+kind.String(), "name", req.Name)
|
|
return nil, fmt.Errorf("error fetching %s: %w", kind.String(), err)
|
|
// default: fallthrough
|
|
}
|
|
|
|
data := response.GetResource().GetData()
|
|
if err := data.UnmarshalTo(payload); err != nil {
|
|
f.logger.Error("error unmarshalling "+kind.String(), "name", req.Name)
|
|
return nil, fmt.Errorf("error unmarshalling %s: %w", kind.String(), err)
|
|
}
|
|
return response.GetResource(), nil
|
|
}
|
|
|
|
// addressFromWorkloadAddresses returns one address from the workload addresses.
|
|
func (f *V2DataFetcher) addressFromWorkloadAddresses(addresses []*pbcatalog.WorkloadAddress, name string) (string, error) {
|
|
// TODO: (v2-dns): we will need to intelligently return the right workload address based on either the translate
|
|
// address setting or the locality of the requester. Workloads must have at least one.
|
|
// We also need to make sure that we filter out unix sockets here.
|
|
address := addresses[0].GetHost()
|
|
if strings.HasPrefix(address, "unix://") {
|
|
f.logger.Error("unix sockets are currently unsupported in workload results", "name", name)
|
|
return "", ErrNotFound
|
|
}
|
|
return address, nil
|
|
}
|
|
|
|
// getEndpointWeight returns the weight of the endpoint and a boolean indicating if the endpoint should be included
|
|
// based on it's health status.
|
|
func getEndpointWeight(endpoint *pbcatalog.Endpoint, configCtx *v2DataFetcherDynamicConfig) (uint32, bool) {
|
|
health := endpoint.GetHealthStatus().Enum()
|
|
if health == nil {
|
|
return 0, false
|
|
}
|
|
|
|
// Filter based on health status and agent config
|
|
// This is also a good opportunity to see if SRV weights are set
|
|
var weight uint32
|
|
switch *health {
|
|
case pbcatalog.Health_HEALTH_PASSING:
|
|
weight = endpoint.GetDns().GetWeights().GetPassing()
|
|
case pbcatalog.Health_HEALTH_CRITICAL:
|
|
return 0, false // always filtered out
|
|
case pbcatalog.Health_HEALTH_WARNING:
|
|
if configCtx.onlyPassing {
|
|
return 0, false // filtered out
|
|
}
|
|
weight = endpoint.GetDns().GetWeights().GetWarning()
|
|
default:
|
|
// Everything else can be filtered out
|
|
return 0, false
|
|
}
|
|
|
|
// Important! double-check the weight in the case DNS weights are not set
|
|
if weight == 0 {
|
|
weight = 1
|
|
}
|
|
return weight, true
|
|
}
|
|
|
|
// getResultPorts conditionally returns ports from a map based on a query. The results are sorted by name.
|
|
func getResultPorts(req *QueryPayload, workloadPorts map[string]*pbcatalog.WorkloadPort) []Port {
|
|
if len(workloadPorts) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var ports []Port
|
|
if req.PortName != "" {
|
|
// Make sure the workload implements that port name.
|
|
if _, ok := workloadPorts[req.PortName]; !ok {
|
|
return nil
|
|
}
|
|
// In the case that the query asked for a specific port, we only return that port.
|
|
ports = []Port{
|
|
{
|
|
Name: req.PortName,
|
|
Number: workloadPorts[req.PortName].Port,
|
|
},
|
|
}
|
|
} else {
|
|
// If the client didn't specify a particular port, return all the workload ports.
|
|
for name, port := range workloadPorts {
|
|
ports = append(ports, Port{
|
|
Name: name,
|
|
Number: port.Port,
|
|
})
|
|
}
|
|
// Stable Sort
|
|
slices.SortStableFunc(ports, func(i, j Port) int {
|
|
if i.Name < j.Name {
|
|
return -1
|
|
} else if i.Name > j.Name {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
return ports
|
|
}
|
|
|
|
// queryTenancyToResourceTenancy converts a QueryTenancy to a pbresource.Tenancy.
|
|
func queryTenancyToResourceTenancy(qTenancy QueryTenancy) *pbresource.Tenancy {
|
|
rTenancy := resource.DefaultNamespacedTenancy()
|
|
|
|
// If the request has any tenancy specified, it overrides the defaults.
|
|
if qTenancy.Namespace != "" {
|
|
rTenancy.Namespace = qTenancy.Namespace
|
|
}
|
|
// In the case of partition, we have the agent's partition as the fallback.
|
|
if qTenancy.Partition != "" {
|
|
rTenancy.Partition = qTenancy.Partition
|
|
}
|
|
|
|
return rTenancy
|
|
}
|
|
|
|
// grpcNotFoundErr returns true if the error is a gRPC status error with a code of NotFound.
|
|
func grpcNotFoundErr(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
s, ok := status.FromError(err)
|
|
return ok && s.Code() == codes.NotFound
|
|
}
|