Consul is a distributed, highly available, and data center aware solution to connect and configure applications across dynamic, distributed infrastructure.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

598 lines
19 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package discovery
import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
const (
// Increment a counter when requests staler than this are served
staleCounterThreshold = 5 * time.Second
)
// v1DataFetcherDynamicConfig is used to store the dynamic configuration of the V1 data fetcher.
type v1DataFetcherDynamicConfig struct {
// Default request tenancy
datacenter string
segmentName string
nodeName string
nodePartition string
// Catalog configuration
allowStale bool
maxStale time.Duration
useCache bool
cacheMaxAge time.Duration
onlyPassing bool
}
// V1DataFetcher is used to fetch data from the V1 catalog.
type V1DataFetcher struct {
// TODO(v2-dns): store this in the config.
defaultEnterpriseMeta acl.EnterpriseMeta
dynamicConfig atomic.Value
logger hclog.Logger
getFromCacheFunc func(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
rpcFunc func(ctx context.Context, method string, args interface{}, reply interface{}) error
rpcFuncForServiceNodes func(ctx context.Context, req structs.ServiceSpecificRequest) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error)
rpcFuncForSamenessGroup func(ctx context.Context, req *structs.ConfigEntryQuery) (structs.SamenessGroupConfigEntry, cache.ResultMeta, error)
translateServicePortFunc func(dc string, port int, taggedAddresses map[string]structs.ServiceAddress) int
}
// NewV1DataFetcher creates a new V1 data fetcher.
func NewV1DataFetcher(config *config.RuntimeConfig,
entMeta *acl.EnterpriseMeta,
getFromCacheFunc func(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error),
rpcFunc func(ctx context.Context, method string, args interface{}, reply interface{}) error,
rpcFuncForServiceNodes func(ctx context.Context, req structs.ServiceSpecificRequest) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error),
rpcFuncForSamenessGroup func(ctx context.Context, req *structs.ConfigEntryQuery) (structs.SamenessGroupConfigEntry, cache.ResultMeta, error),
translateServicePortFunc func(dc string, port int, taggedAddresses map[string]structs.ServiceAddress) int,
logger hclog.Logger) *V1DataFetcher {
f := &V1DataFetcher{
defaultEnterpriseMeta: *entMeta,
getFromCacheFunc: getFromCacheFunc,
rpcFunc: rpcFunc,
rpcFuncForServiceNodes: rpcFuncForServiceNodes,
rpcFuncForSamenessGroup: rpcFuncForSamenessGroup,
translateServicePortFunc: translateServicePortFunc,
logger: logger,
}
f.LoadConfig(config)
return f
}
// LoadConfig loads the configuration for the V1 data fetcher.
func (f *V1DataFetcher) LoadConfig(config *config.RuntimeConfig) {
dynamicConfig := &v1DataFetcherDynamicConfig{
allowStale: config.DNSAllowStale,
maxStale: config.DNSMaxStale,
useCache: config.DNSUseCache,
cacheMaxAge: config.DNSCacheMaxAge,
onlyPassing: config.DNSOnlyPassing,
datacenter: config.Datacenter,
segmentName: config.SegmentName,
nodeName: config.NodeName,
}
f.dynamicConfig.Store(dynamicConfig)
}
// FetchNodes fetches A/AAAA/CNAME
func (f *V1DataFetcher) FetchNodes(ctx Context, req *QueryPayload) ([]*Result, error) {
cfg := f.dynamicConfig.Load().(*v1DataFetcherDynamicConfig)
// Make an RPC request
args := &structs.NodeSpecificRequest{
Datacenter: req.Tenancy.Datacenter,
PeerName: req.Tenancy.Peer,
Node: req.Name,
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
AllowStale: cfg.allowStale,
},
EnterpriseMeta: queryTenancyToEntMeta(req.Tenancy),
}
out, err := f.fetchNode(cfg, args)
if err != nil {
return nil, fmt.Errorf("failed rpc request: %w", err)
}
// If we have no out.NodeServices.Nodeaddress, return not found!
if out.NodeServices == nil {
return nil, ErrNotFound
}
results := make([]*Result, 0, 1)
n := out.NodeServices.Node
results = append(results, &Result{
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Type: ResultTypeNode,
Metadata: n.Meta,
Tenancy: ResultTenancy{
// Namespace is not required because nodes are not namespaced
Partition: n.GetEnterpriseMeta().PartitionOrDefault(),
Datacenter: n.Datacenter,
},
})
return results, nil
}
// FetchEndpoints fetches records for A/AAAA/CNAME or SRV requests for services
func (f *V1DataFetcher) FetchEndpoints(ctx Context, req *QueryPayload, lookupType LookupType) ([]*Result, error) {
f.logger.Debug(fmt.Sprintf("FetchEndpoints - req: %+v / lookupType: %+v", req, lookupType))
cfg := f.dynamicConfig.Load().(*v1DataFetcherDynamicConfig)
return f.fetchService(ctx, req, cfg, lookupType)
}
// FetchVirtualIP fetches A/AAAA records for virtual IPs
func (f *V1DataFetcher) FetchVirtualIP(ctx Context, req *QueryPayload) (*Result, error) {
args := structs.ServiceSpecificRequest{
// The datacenter of the request is not specified because cross-datacenter virtual IP
// queries are not supported. This guard rail is in place because virtual IPs are allocated
// within a DC, therefore their uniqueness is not guaranteed globally.
PeerName: req.Tenancy.Peer,
ServiceName: req.Name,
EnterpriseMeta: queryTenancyToEntMeta(req.Tenancy),
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
},
}
var out string
if err := f.rpcFunc(context.Background(), "Catalog.VirtualIPForService", &args, &out); err != nil {
return nil, err
}
result := &Result{
Service: &Location{
Name: req.Name,
Address: out,
},
Type: ResultTypeVirtual,
}
return result, nil
}
// FetchRecordsByIp is used for PTR requests to look up a service/node from an IP.
// The search is performed in the agent's partition and over all namespaces (or those allowed by the ACL token).
func (f *V1DataFetcher) FetchRecordsByIp(reqCtx Context, ip net.IP) ([]*Result, error) {
if ip == nil {
return nil, ErrNotSupported
}
configCtx := f.dynamicConfig.Load().(*v1DataFetcherDynamicConfig)
targetIP := ip.String()
var results []*Result
args := structs.DCSpecificRequest{
Datacenter: configCtx.datacenter,
QueryOptions: structs.QueryOptions{
Token: reqCtx.Token,
AllowStale: configCtx.allowStale,
},
}
var out structs.IndexedNodes
// TODO: Replace ListNodes with an internal RPC that can do the filter
// server side to avoid transferring the entire node list.
if err := f.rpcFunc(context.Background(), "Catalog.ListNodes", &args, &out); err == nil {
for _, n := range out.Nodes {
if targetIP == n.Address {
results = append(results, &Result{
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Type: ResultTypeNode,
Tenancy: ResultTenancy{
Namespace: f.defaultEnterpriseMeta.NamespaceOrDefault(),
Partition: f.defaultEnterpriseMeta.PartitionOrDefault(),
Datacenter: configCtx.datacenter,
},
})
return results, nil
}
}
}
// only look into the services if we didn't find a node
sargs := structs.ServiceSpecificRequest{
Datacenter: configCtx.datacenter,
QueryOptions: structs.QueryOptions{
Token: reqCtx.Token,
AllowStale: configCtx.allowStale,
},
ServiceAddress: targetIP,
EnterpriseMeta: *f.defaultEnterpriseMeta.WithWildcardNamespace(),
}
var sout structs.IndexedServiceNodes
if err := f.rpcFunc(context.Background(), "Catalog.ServiceNodes", &sargs, &sout); err == nil {
for _, n := range sout.ServiceNodes {
if n.ServiceAddress == targetIP {
results = append(results, &Result{
Service: &Location{
Name: n.ServiceName,
Address: n.ServiceAddress,
},
Type: ResultTypeService,
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Tenancy: ResultTenancy{
Namespace: n.NamespaceOrEmpty(),
Partition: n.PartitionOrEmpty(),
Datacenter: n.Datacenter,
},
})
return results, nil
}
}
}
// nothing found locally, recurse
// TODO: (v2-dns) implement recursion
//d.handleRecurse(resp, req)
return nil, fmt.Errorf("unhandled error in FetchRecordsByIp")
}
// FetchWorkload fetches a single Result associated with
// V2 Workload. V2-only.
func (f *V1DataFetcher) FetchWorkload(ctx Context, req *QueryPayload) (*Result, error) {
return nil, ErrNotSupported
}
// FetchPreparedQuery evaluates the results of a prepared query.
// deprecated in V2
func (f *V1DataFetcher) FetchPreparedQuery(ctx Context, req *QueryPayload) ([]*Result, error) {
cfg := f.dynamicConfig.Load().(*v1DataFetcherDynamicConfig)
// Execute the prepared query.
args := structs.PreparedQueryExecuteRequest{
Datacenter: req.Tenancy.Datacenter,
QueryIDOrName: req.Name,
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
AllowStale: cfg.allowStale,
MaxAge: cfg.cacheMaxAge,
},
// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: cfg.datacenter,
Segment: cfg.segmentName,
Node: cfg.nodeName,
NodePartition: cfg.nodePartition,
},
Source: structs.QuerySource{
Ip: req.SourceIP.String(),
},
}
out, err := f.executePreparedQuery(cfg, args)
if err != nil {
// errors.Is() doesn't work with errors.New() so we need to check the error message.
if err.Error() == structs.ErrQueryNotFound.Error() {
err = ErrNotFound
}
return nil, ECSNotGlobalError{err}
}
// TODO (slackpad) - What's a safe limit we can set here? It seems like
// with dup filtering done at this level we need to get everything to
// match the previous behavior. We can optimize by pushing more filtering
// into the query execution, but for now I think we need to get the full
// response. We could also choose a large arbitrary number that will
// likely work in practice, like 10*maxUDPAnswerLimit which should help
// reduce bandwidth if there are thousands of nodes available.
// Determine the TTL. The parse should never fail since we vet it when
// the query is created, but we check anyway. If the query didn't
// specify a TTL then we will try to use the agent's service-specific
// TTL configs.
// Check is there is a TTL provided as part of the prepared query
var ttlOverride *uint32
if out.DNS.TTL != "" {
ttl, err := time.ParseDuration(out.DNS.TTL)
if err == nil {
ttlSec := uint32(ttl / time.Second)
ttlOverride = &ttlSec
}
f.logger.Warn("Failed to parse TTL for prepared query , ignoring",
"ttl", out.DNS.TTL,
"prepared_query", req.Name,
)
}
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ECSNotGlobalError{ErrNotFound}
}
// Perform a random shuffle
out.Nodes.Shuffle()
return f.buildResultsFromServiceNodes(out.Nodes, req, ttlOverride), ECSNotGlobalError{}
}
// executePreparedQuery is used to execute a PreparedQuery against the Consul catalog.
// If the config is set to UseCache, it will use agent cache.
func (f *V1DataFetcher) executePreparedQuery(cfg *v1DataFetcherDynamicConfig, args structs.PreparedQueryExecuteRequest) (*structs.PreparedQueryExecuteResponse, error) {
var out structs.PreparedQueryExecuteResponse
RPC:
if cfg.useCache {
raw, m, err := f.getFromCacheFunc(context.TODO(), cachetype.PreparedQueryName, &args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.PreparedQueryExecuteResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, err
}
f.logger.Trace("cache results for prepared query",
"cache_hit", m.Hit,
"prepared_query", args.QueryIDOrName,
)
out = *reply
} else {
if err := f.rpcFunc(context.Background(), "PreparedQuery.Execute", &args, &out); err != nil {
return nil, err
}
}
// Verify that request is not too stale, redo the request.
if args.AllowStale {
if out.LastContact > cfg.maxStale {
args.AllowStale = false
f.logger.Warn("Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
return &out, nil
}
func (f *V1DataFetcher) ValidateRequest(_ Context, req *QueryPayload) error {
if req.EnableFailover {
return ErrNotSupported
}
if req.PortName != "" {
return ErrNotSupported
}
return validateEnterpriseTenancy(req.Tenancy)
}
// buildResultsFromServiceNodes builds a list of results from a list of nodes.
func (f *V1DataFetcher) buildResultsFromServiceNodes(nodes []structs.CheckServiceNode, req *QueryPayload, ttlOverride *uint32) []*Result {
// Convert the service endpoints to results up to the limit
limit := req.Limit
if len(nodes) < limit || limit == 0 {
limit = len(nodes)
}
results := make([]*Result, 0, limit)
for idx := 0; idx < limit; idx++ {
n := nodes[idx]
results = append(results, &Result{
Service: &Location{
Name: n.Service.Service,
Address: n.Service.Address,
},
Node: &Location{
Name: n.Node.Node,
Address: n.Node.Address,
},
Type: ResultTypeService,
DNS: DNSConfig{
TTL: ttlOverride,
Weight: uint32(findWeight(n)),
},
PortNumber: uint32(f.translateServicePortFunc(n.Node.Datacenter, n.Service.Port, n.Service.TaggedAddresses)),
Metadata: n.Node.Meta,
Tenancy: ResultTenancy{
Namespace: n.Service.NamespaceOrEmpty(),
Partition: n.Service.PartitionOrEmpty(),
Datacenter: n.Node.Datacenter,
},
})
}
return results
}
// fetchNode is used to look up a node in the Consul catalog within NodeServices.
// If the config is set to UseCache, it will get the record from the agent cache.
func (f *V1DataFetcher) fetchNode(cfg *v1DataFetcherDynamicConfig, args *structs.NodeSpecificRequest) (*structs.IndexedNodeServices, error) {
var out structs.IndexedNodeServices
useCache := cfg.useCache
RPC:
if useCache {
raw, _, err := f.getFromCacheFunc(context.TODO(), cachetype.NodeServicesName, args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.IndexedNodeServices)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
if err := f.rpcFunc(context.Background(), "Catalog.NodeServices", &args, &out); err != nil {
return nil, err
}
}
// Verify that request is not too stale, redo the request
if args.AllowStale {
if out.LastContact > cfg.maxStale {
args.AllowStale = false
useCache = false
f.logger.Warn("Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
return &out, nil
}
func (f *V1DataFetcher) fetchService(ctx Context, req *QueryPayload,
cfg *v1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) {
f.logger.Debug("fetchService", "req", req)
if req.Tenancy.SamenessGroup == "" {
return f.fetchServiceBasedOnTenancy(ctx, req, cfg, lookupType)
}
return f.fetchServiceFromSamenessGroup(ctx, req, cfg, lookupType)
}
// fetchServiceBasedOnTenancy is used to look up a service in the Consul catalog based on its tenancy or default tenancy.
func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayload,
cfg *v1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) {
f.logger.Debug(fmt.Sprintf("fetchServiceBasedOnTenancy - req: %+v", req))
if req.Tenancy.SamenessGroup != "" {
return nil, errors.New("sameness groups are not allowed for service lookups based on tenancy")
}
datacenter := req.Tenancy.Datacenter
if req.Tenancy.Peer != "" {
datacenter = ""
}
serviceTags := []string{}
if req.Tag != "" {
serviceTags = []string{req.Tag}
}
args := structs.ServiceSpecificRequest{
PeerName: req.Tenancy.Peer,
Connect: lookupType == LookupTypeConnect,
Ingress: lookupType == LookupTypeIngress,
Datacenter: datacenter,
ServiceName: req.Name,
ServiceTags: serviceTags,
TagFilter: req.Tag != "",
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
AllowStale: cfg.allowStale,
MaxAge: cfg.cacheMaxAge,
UseCache: cfg.useCache,
MaxStaleDuration: cfg.maxStale,
},
EnterpriseMeta: queryTenancyToEntMeta(req.Tenancy),
}
out, _, err := f.rpcFuncForServiceNodes(context.TODO(), args)
if err != nil {
if strings.Contains(err.Error(), structs.ErrNoDCPath.Error()) {
return nil, ErrNoPathToDatacenter
}
return nil, fmt.Errorf("rpc request failed: %w", err)
}
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNotFound
}
// Filter out any service nodes due to health checks
// We copy the slice to avoid modifying the result if it comes from the cache
nodes := make(structs.CheckServiceNodes, len(out.Nodes))
copy(nodes, out.Nodes)
out.Nodes = nodes.Filter(cfg.onlyPassing)
if err != nil {
return nil, fmt.Errorf("rpc request failed: %w", err)
}
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNotFound
}
// Perform a random shuffle
out.Nodes.Shuffle()
return f.buildResultsFromServiceNodes(out.Nodes, req, nil), nil
}
// findWeight returns the weight of a service node.
func findWeight(node structs.CheckServiceNode) int {
// By default, when only_passing is false, warning and passing nodes are returned
// Those values will be used if using a client with support while server has no
// support for weights
weightPassing := 1
weightWarning := 1
if node.Service.Weights != nil {
weightPassing = node.Service.Weights.Passing
weightWarning = node.Service.Weights.Warning
}
serviceChecks := make(api.HealthChecks, 0, len(node.Checks))
for _, c := range node.Checks {
if c.ServiceName == node.Service.Service || c.ServiceName == "" {
healthCheck := &api.HealthCheck{
Node: c.Node,
CheckID: string(c.CheckID),
Name: c.Name,
Status: c.Status,
Notes: c.Notes,
Output: c.Output,
ServiceID: c.ServiceID,
ServiceName: c.ServiceName,
ServiceTags: c.ServiceTags,
}
serviceChecks = append(serviceChecks, healthCheck)
}
}
status := serviceChecks.AggregatedStatus()
switch status {
case api.HealthWarning:
return weightWarning
case api.HealthPassing:
return weightPassing
case api.HealthMaint:
// Not used in theory
return 0
case api.HealthCritical:
// Should not happen since already filtered
return 0
default:
// When non-standard status, return 1
return 1
}
}