diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 698d4c1ddb..6f00ec4b08 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -8,15 +8,15 @@ import ( "sort" "github.com/armon/go-metrics" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" ) // Health endpoint is used to query the health information @@ -250,69 +250,86 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc func(ws memdb.WatchSet, state *state.Store) error { var thisReply structs.IndexedCheckServiceNodes - index, nodes, err := f(ws, state, args) + sgIdx, sgArgs, err := h.getArgsForSamenessGroupMembers(args, ws, state) if err != nil { return err } - resolvedNodes := nodes - if args.MergeCentralConfig { - for _, node := range resolvedNodes { - ns := node.Service - if ns.IsSidecarProxy() || ns.IsGateway() { - cfgIndex, mergedns, err := configentry.MergeNodeServiceWithCentralConfig(ws, state, ns, h.logger) - if err != nil { - return err - } - if cfgIndex > index { - index = cfgIndex + for _, arg := range sgArgs { + index, nodes, err := f(ws, state, arg) + if err != nil { + return err + } + + resolvedNodes := nodes + if arg.MergeCentralConfig { + for _, node := range resolvedNodes { + ns := node.Service + if ns.IsSidecarProxy() || ns.IsGateway() { + cfgIndex, mergedns, err := configentry.MergeNodeServiceWithCentralConfig(ws, state, ns, h.logger) + if err != nil { + return err + } + if cfgIndex > index { + index = cfgIndex + } + *node.Service = *mergedns } - *node.Service = *mergedns } - } - // Generate a hash of the resolvedNodes driving this response. - // Use it to determine if the response is identical to a prior wakeup. - newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil) - if err != nil { - return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) - } - if ranMergeOnce && priorMergeHash == newMergeHash { - // the below assignment is not required as the if condition already validates equality, - // but makes it more clear that prior value is being reset to the new hash on each run. - priorMergeHash = newMergeHash - reply.Index = index - // NOTE: the prior response is still alive inside of *reply, which is desirable - return errNotChanged - } else { - priorMergeHash = newMergeHash - ranMergeOnce = true - } + // Generate a hash of the resolvedNodes driving this response. + // Use it to determine if the response is identical to a prior wakeup. + newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + if ranMergeOnce && priorMergeHash == newMergeHash { + // the below assignment is not required as the if condition already validates equality, + // but makes it more clear that prior value is being reset to the new hash on each run. + priorMergeHash = newMergeHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which is desirable + return errNotChanged + } else { + priorMergeHash = newMergeHash + ranMergeOnce = true + } - } + } - thisReply.Index, thisReply.Nodes = index, resolvedNodes + thisReply.Index, thisReply.Nodes = index, resolvedNodes - if len(args.NodeMetaFilters) > 0 { - thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes) - } + if len(arg.NodeMetaFilters) > 0 { + thisReply.Nodes = nodeMetaFilter(arg.NodeMetaFilters, thisReply.Nodes) + } - raw, err := filter.Execute(thisReply.Nodes) - if err != nil { - return err - } - filteredNodes := raw.(structs.CheckServiceNodes) - thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: args.HealthFilterType}) + raw, err := filter.Execute(thisReply.Nodes) + if err != nil { + return err + } + filteredNodes := raw.(structs.CheckServiceNodes) + thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: arg.HealthFilterType}) + + // Note: we filter the results with ACLs *after* applying the user-supplied + // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include + // results that would be filtered out even if the user did have permission. + if err := h.srv.filterACL(arg.Token, &thisReply); err != nil { + return err + } - // Note: we filter the results with ACLs *after* applying the user-supplied - // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include - // results that would be filtered out even if the user did have permission. - if err := h.srv.filterACL(args.Token, &thisReply); err != nil { - return err + if err := h.srv.sortNodesByDistanceFrom(arg.Source, thisReply.Nodes); err != nil { + return err + } + if len(thisReply.Nodes) > 0 { + break + } } - if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil { - return err + // If sameness group was used, evaluate the index of the sameness group + // and update the index of the response if it is greater. If sameness group is not + // used, the sgIdx will be 0 in this evaluation. + if sgIdx > thisReply.Index { + thisReply.Index = sgIdx } *reply = thisReply diff --git a/agent/consul/health_endpoint_ce.go b/agent/consul/health_endpoint_ce.go new file mode 100644 index 0000000000..1580054e5d --- /dev/null +++ b/agent/consul/health_endpoint_ce.go @@ -0,0 +1,38 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !consulent + +package consul + +import ( + "errors" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" +) + +// getArgsForSamenessGroupMembers returns the arguments for the sameness group members if SamenessGroup +// field is set in the ServiceSpecificRequest. It returns the index of the sameness group, the arguments +// for the sameness group members and an error if any. +// If SamenessGroup is not set, it returns: +// - the index 0 +// - an array containing the original arguments +// - nil error +// If SamenessGroup is set on CE, it returns:: +// - the index of 0 +// - nil array +// - an error indicating that sameness groups are not supported in consul CE +// If SamenessGroup is set on ENT, it returns: +// - the index of the sameness group +// - an array containing the arguments for the sameness group members +// - nil error +func (h *Health) getArgsForSamenessGroupMembers(args *structs.ServiceSpecificRequest, + ws memdb.WatchSet, state *state.Store) (uint64, []*structs.ServiceSpecificRequest, error) { + if args.SamenessGroup != "" { + return 0, nil, errors.New("sameness groups are not supported in consul CE") + } + return 0, []*structs.ServiceSpecificRequest{args}, nil +} diff --git a/agent/discovery/query_fetcher_v1.go b/agent/discovery/query_fetcher_v1.go index e2d424242e..da76d744dd 100644 --- a/agent/discovery/query_fetcher_v1.go +++ b/agent/discovery/query_fetcher_v1.go @@ -5,7 +5,6 @@ package discovery import ( "context" - "errors" "fmt" "net" "strings" @@ -469,7 +468,7 @@ func (f *V1DataFetcher) buildResultsFromServiceNodes(nodes []structs.CheckServic Namespace: n.Service.NamespaceOrEmpty(), Partition: n.Service.PartitionOrEmpty(), Datacenter: n.Node.Datacenter, - PeerName: req.Tenancy.Peer, + PeerName: n.Service.PeerName, }, }) } @@ -542,23 +541,10 @@ RPC: return &out, nil } +// fetchService is used to look up a service in the Consul catalog. func (f *V1DataFetcher) fetchService(ctx Context, req *QueryPayload, cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) { - f.logger.Trace("fetchService", "req", req) - if req.Tenancy.SamenessGroup == "" { - return f.fetchServiceBasedOnTenancy(ctx, req, cfg, lookupType) - } - - return f.fetchServiceFromSamenessGroup(ctx, req, cfg, lookupType) -} - -// fetchServiceBasedOnTenancy is used to look up a service in the Consul catalog based on its tenancy or default tenancy. -func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayload, - cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) { - f.logger.Trace(fmt.Sprintf("fetchServiceBasedOnTenancy - req: %+v", req)) - if req.Tenancy.SamenessGroup != "" { - return nil, errors.New("sameness groups are not allowed for service lookups based on tenancy") - } + f.logger.Trace(fmt.Sprintf("fetchService - req: %+v", req)) // If no datacenter is passed, default to our own datacenter := cfg.Datacenter @@ -573,14 +559,13 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa if req.Tag != "" { serviceTags = []string{req.Tag} } - healthFilterType := structs.HealthFilterExcludeCritical if cfg.OnlyPassing { healthFilterType = structs.HealthFilterIncludeOnlyPassing } - args := structs.ServiceSpecificRequest{ PeerName: req.Tenancy.Peer, + SamenessGroup: req.Tenancy.SamenessGroup, Connect: lookupType == LookupTypeConnect, Ingress: lookupType == LookupTypeIngress, Datacenter: datacenter, @@ -611,11 +596,6 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa return nil, ErrNotFound } - // If we have no nodes, return not found! - if len(out.Nodes) == 0 { - return nil, ErrNotFound - } - // Perform a random shuffle out.Nodes.Shuffle() return f.buildResultsFromServiceNodes(out.Nodes, req, nil), nil diff --git a/agent/discovery/query_fetcher_v1_ce.go b/agent/discovery/query_fetcher_v1_ce.go index 59d32e91e2..090db0e5f7 100644 --- a/agent/discovery/query_fetcher_v1_ce.go +++ b/agent/discovery/query_fetcher_v1_ce.go @@ -6,9 +6,6 @@ package discovery import ( - "errors" - "fmt" - "github.com/hashicorp/consul/acl" ) @@ -27,12 +24,3 @@ func validateEnterpriseTenancy(req QueryTenancy) error { func queryTenancyToEntMeta(_ QueryTenancy) acl.EnterpriseMeta { return acl.EnterpriseMeta{} } - -// fetchServiceFromSamenessGroup fetches a service from a sameness group. -func (f *V1DataFetcher) fetchServiceFromSamenessGroup(ctx Context, req *QueryPayload, cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) { - f.logger.Trace(fmt.Sprintf("fetchServiceFromSamenessGroup - req: %+v", req)) - if req.Tenancy.SamenessGroup == "" { - return nil, errors.New("sameness groups must be provided for service lookups") - } - return f.fetchServiceBasedOnTenancy(ctx, req, cfg, lookupType) -} diff --git a/agent/discovery/query_fetcher_v1_test.go b/agent/discovery/query_fetcher_v1_test.go index 450b0cb13a..f56e1f61b8 100644 --- a/agent/discovery/query_fetcher_v1_test.go +++ b/agent/discovery/query_fetcher_v1_test.go @@ -182,8 +182,9 @@ func Test_FetchEndpoints(t *testing.T) { Node: "node-name", }, Service: &structs.NodeService{ - Address: "service-address", - Service: "service-name", + Address: "service-address", + Service: "service-name", + PeerName: "test-peer", }, }, }, diff --git a/agent/dns.go b/agent/dns.go index 8ddfcaca3f..92dcd273e0 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -92,6 +92,7 @@ type serviceLookup struct { PeerName string Datacenter string Service string + SamenessGroup string Tag string MaxRecursionLevel int Connect bool @@ -439,18 +440,11 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) { // server side to avoid transferring the entire node list. if err := d.agent.RPC(context.Background(), "Catalog.ListNodes", &args, &out); err == nil { for _, n := range out.Nodes { - lookup := serviceLookup{ - // Peering PTR lookups are currently not supported, so we don't - // need to populate that field for creating the node FQDN. - // PeerName: n.PeerName, - Datacenter: n.Datacenter, - EnterpriseMeta: *n.GetEnterpriseMeta(), - } arpa, _ := dns.ReverseAddr(n.Address) if arpa == qName { ptr := &dns.PTR{ Hdr: dns.RR_Header{Name: q.Name, Rrtype: dns.TypePTR, Class: dns.ClassINET, Ttl: 0}, - Ptr: nodeCanonicalDNSName(lookup, n.Node, d.domain), + Ptr: nodeCanonicalDNSName(n, d.domain), } m.Answer = append(m.Answer, ptr) break @@ -738,6 +732,10 @@ type queryLocality struct { // not be shared between datacenters. In all other cases, it should be considered a DC. peerOrDatacenter string + // samenessGroup is the samenessGroup name parsed from a label that has explicit parts. + // Example query: .service..sg.consul + samenessGroup string + acl.EnterpriseMeta } @@ -805,59 +803,56 @@ func (d *DNSServer) dispatch(remoteAddr net.Addr, req, resp *dns.Msg, maxRecursi return invalid() } - localities, err := d.parseSamenessGroupLocality(cfg, querySuffixes, invalid) + locality, err := d.parseSamenessGroupLocality(cfg, querySuffixes, invalid) if err != nil { return err } - // Loop over the localities and return as soon as a lookup is successful - for _, locality := range localities { - d.logger.Debug("labels", "querySuffixes", querySuffixes) - - lookup := serviceLookup{ - Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter), - PeerName: locality.peer, - Connect: false, - Ingress: false, - MaxRecursionLevel: maxRecursionLevel, - EnterpriseMeta: locality.EnterpriseMeta, - } - // Only one of dc or peer can be used. - if lookup.PeerName != "" { - lookup.Datacenter = "" - } - - // Support RFC 2782 style syntax - if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") { - // Grab the tag since we make nuke it if it's tcp - tag := queryParts[1][1:] + lookup := serviceLookup{ + Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter), + PeerName: locality.peer, + SamenessGroup: locality.samenessGroup, + Connect: false, + Ingress: false, + MaxRecursionLevel: maxRecursionLevel, + EnterpriseMeta: locality.EnterpriseMeta, + } - // Treat _name._tcp.service.consul as a default, no need to filter on that tag - if tag == "tcp" { - tag = "" - } + // Only one of dc or peer can be used. + if lookup.PeerName != "" { + lookup.Datacenter = "" + } - lookup.Tag = tag - lookup.Service = queryParts[0][1:] - // _name._tag.service.consul - } else { - // Consul 0.3 and prior format for SRV queries - // Support "." in the label, re-join all the parts - tag := "" - if n >= 2 { - tag = strings.Join(queryParts[:n-1], ".") - } + // Support RFC 2782 style syntax + if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") { + // Grab the tag since we make nuke it if it's tcp + tag := queryParts[1][1:] - lookup.Tag = tag - lookup.Service = queryParts[n-1] - // tag[.tag].name.service.consul + // Treat _name._tcp.service.consul as a default, no need to filter on that tag + if tag == "tcp" { + tag = "" } - err = d.handleServiceQuery(cfg, lookup, req, resp) - // Return if we are error free right away, otherwise loop again if we can - if err == nil { - return nil + lookup.Tag = tag + lookup.Service = queryParts[0][1:] + // _name._tag.service.consul + } else { + // Consul 0.3 and prior format for SRV queries + // Support "." in the label, re-join all the parts + tag := "" + if n >= 2 { + tag = strings.Join(queryParts[:n-1], ".") } + + lookup.Tag = tag + lookup.Service = queryParts[n-1] + // tag[.tag].name.service.consul + } + + err = d.handleServiceQuery(cfg, lookup, req, resp) + // Return if we are error free right away, otherwise loop again if we can + if err == nil { + return nil } // We've exhausted all DNS possibilities so return here @@ -1456,6 +1451,7 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st } args := structs.ServiceSpecificRequest{ PeerName: lookup.PeerName, + SamenessGroup: lookup.SamenessGroup, Connect: lookup.Connect, Ingress: lookup.Ingress, Datacenter: lookup.Datacenter, @@ -1758,20 +1754,20 @@ func findWeight(node structs.CheckServiceNode) int { } } -func (d *DNSServer) encodeIPAsFqdn(questionName string, lookup serviceLookup, ip net.IP) string { +func (d *DNSServer) encodeIPAsFqdn(questionName string, serviceNode structs.CheckServiceNode, ip net.IP) string { ipv4 := ip.To4() respDomain := d.getResponseDomain(questionName) ipStr := hex.EncodeToString(ip) if ipv4 != nil { ipStr = ipStr[len(ipStr)-(net.IPv4len*2):] } - if lookup.PeerName != "" { + if serviceNode.Service.PeerName != "" { // Exclude the datacenter from the FQDN on the addr for peers. // This technically makes no difference, since the addr endpoint ignores the DC // component of the request, but do it anyway for a less confusing experience. return fmt.Sprintf("%s.addr.%s", ipStr, respDomain) } - return fmt.Sprintf("%s.addr.%s.%s", ipStr, lookup.Datacenter, respDomain) + return fmt.Sprintf("%s.addr.%s.%s", ipStr, serviceNode.Node.Datacenter, respDomain) } // Craft dns records for a an A record for an IP address @@ -1860,7 +1856,7 @@ func (d *DNSServer) makeRecordFromServiceNode(lookup serviceLookup, serviceNode if q.Qtype == dns.TypeSRV { respDomain := d.getResponseDomain(q.Name) - nodeFQDN := nodeCanonicalDNSName(lookup, serviceNode.Node.Node, respDomain) + nodeFQDN := nodeCanonicalDNSName(serviceNode.Node, respDomain) answers := []dns.RR{ &dns.SRV{ Hdr: dns.RR_Header{ @@ -1895,7 +1891,7 @@ func (d *DNSServer) makeRecordFromIP(lookup serviceLookup, addr net.IP, serviceN } if q.Qtype == dns.TypeSRV { - ipFQDN := d.encodeIPAsFqdn(q.Name, lookup, addr) + ipFQDN := d.encodeIPAsFqdn(q.Name, serviceNode, addr) answers := []dns.RR{ &dns.SRV{ Hdr: dns.RR_Header{ @@ -2076,7 +2072,7 @@ func (d *DNSServer) addServiceSRVRecordsToMessage(cfg *dnsConfig, lookup service resp.Extra = append(resp.Extra, extra...) if cfg.NodeMetaTXT { - resp.Extra = append(resp.Extra, d.makeTXTRecordFromNodeMeta(nodeCanonicalDNSName(lookup, node.Node.Node, respDomain), node.Node, ttl)...) + resp.Extra = append(resp.Extra, d.makeTXTRecordFromNodeMeta(nodeCanonicalDNSName(node.Node, respDomain), node.Node, ttl)...) } } } diff --git a/agent/dns_ce.go b/agent/dns_ce.go index 4eb74442fa..171a789945 100644 --- a/agent/dns_ce.go +++ b/agent/dns_ce.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" + "github.com/hashicorp/consul/agent/structs" ) // NOTE: these functions have also been copied to agent/dns package for dns v2. @@ -63,27 +64,27 @@ func (d *DNSServer) parseLocality(labels []string, cfg *dnsConfig) (queryLocalit type querySameness struct{} // parseSamenessGroupLocality wraps parseLocality in CE -func (d *DNSServer) parseSamenessGroupLocality(cfg *dnsConfig, labels []string, errfnc func() error) ([]queryLocality, error) { +func (d *DNSServer) parseSamenessGroupLocality(cfg *dnsConfig, labels []string, errfnc func() error) (queryLocality, error) { locality, ok := d.parseLocality(labels, cfg) if !ok { - return nil, errfnc() + return queryLocality{}, errfnc() } - return []queryLocality{locality}, nil + return locality, nil } func serviceCanonicalDNSName(name, kind, datacenter, domain string, _ *acl.EnterpriseMeta) string { return fmt.Sprintf("%s.%s.%s.%s", name, kind, datacenter, domain) } -func nodeCanonicalDNSName(lookup serviceLookup, nodeName, respDomain string) string { - if lookup.PeerName != "" { +func nodeCanonicalDNSName(node *structs.Node, respDomain string) string { + if node.PeerName != "" { // We must return a more-specific DNS name for peering so // that there is no ambiguity with lookups. return fmt.Sprintf("%s.node.%s.peer.%s", - nodeName, - lookup.PeerName, + node.Node, + node.PeerName, respDomain) } // Return a simpler format for non-peering nodes. - return fmt.Sprintf("%s.node.%s.%s", nodeName, lookup.Datacenter, respDomain) + return fmt.Sprintf("%s.node.%s.%s", node.Node, node.Datacenter, respDomain) } diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 9c3492faf1..0001c35a11 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -188,6 +188,10 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re } s.parsePeerName(req, &args) + s.parseSamenessGroup(req, &args) + if args.SamenessGroup != "" && args.PeerName != "" { + return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "peer-name and sameness-group are mutually exclusive"} + } // Check for tags params := req.URL.Query() @@ -214,7 +218,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re prefix = "/v1/health/service/" } - // Parse out the service name from the query params + // Parse the service name from the query params args.ServiceName = strings.TrimPrefix(req.URL.Path, prefix) if args.ServiceName == "" { return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"} diff --git a/agent/health_endpoint_ce_test.go b/agent/health_endpoint_ce_test.go new file mode 100644 index 0000000000..fd02d5a553 --- /dev/null +++ b/agent/health_endpoint_ce_test.go @@ -0,0 +1,30 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !consulent + +package agent + +import ( + "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" +) + +func TestHealthServiceNodes_SamenessGroup_ErrorsOnCE(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&sameness-group=foo", nil) + resp := httptest.NewRecorder() + _, err := a.srv.HealthServiceNodes(resp, req) + require.ErrorContains(t, err, "sameness groups are not supported in consul CE") +} diff --git a/agent/http.go b/agent/http.go index d828ed04c1..66c3a8bd36 100644 --- a/agent/http.go +++ b/agent/http.go @@ -739,7 +739,7 @@ func decodeBody(body io.Reader, out interface{}) error { return lib.DecodeJSON(body, out) } -// decodeBodyDeprecated is deprecated, please ues decodeBody above. +// decodeBodyDeprecated is deprecated, please use decodeBody above. // decodeBodyDeprecated is used to decode a JSON request body func decodeBodyDeprecated(req *http.Request, out interface{}, cb func(interface{}) error) error { // This generally only happens in tests since real HTTP requests set @@ -1208,6 +1208,15 @@ func (s *HTTPHandlers) parsePeerName(req *http.Request, args *structs.ServiceSpe } } +func (s *HTTPHandlers) parseSamenessGroup(req *http.Request, args *structs.ServiceSpecificRequest) { + if sg := req.URL.Query().Get("sg"); sg != "" { + args.SamenessGroup = sg + } + if sg := req.URL.Query().Get("sameness-group"); sg != "" { + args.SamenessGroup = sg + } +} + // parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for // filtering results to nodes with the given metadata key/value func (s *HTTPHandlers) parseMetaFilter(req *http.Request) map[string]string { diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 6732b2f88e..c20daa2e82 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -105,7 +105,12 @@ func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { // Streaming is incompatible with NearestN queries (due to lack of ordering), // so we can only use it if the NearestN would never work (Node == "") // or if we explicitly say to ignore the Node field for queries (agentless xDS). - (req.Source.Node == "" || req.Source.DisableNode) + (req.Source.Node == "" || req.Source.DisableNode) && + // Streaming is incompatible with SamenessGroup queries at the moment because + // the subscribe functionality maps to queries based on the service name and tenancy information + // it does not support the ability to subscribe to the same service in different partitions or peers + // and materialize the results into a single view with the first healthy sameness group member. + req.SamenessGroup == "" } func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest { diff --git a/agent/rpcclient/health/health_test.go b/agent/rpcclient/health/health_test.go index 30900bc04c..24c1de120f 100644 --- a/agent/rpcclient/health/health_test.go +++ b/agent/rpcclient/health/health_test.go @@ -98,6 +98,17 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) { }, expected: useRPC, }, + { + name: "rpc if sameness group", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + SamenessGroup: "sg1", + MergeCentralConfig: false, + QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, + }, + expected: useRPC, + }, } for _, tc := range testCases { @@ -246,6 +257,15 @@ func TestClient_Notify_BackendRouting(t *testing.T) { }, expected: useCache, }, + { + name: "use cache for sameness group request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + SamenessGroup: "test-group", + }, + expected: useCache, + }, } for _, tc := range testCases { diff --git a/agent/structs/deep-copy.sh b/agent/structs/deep-copy.sh index 1bc4ededd6..242d4e5ab2 100755 --- a/agent/structs/deep-copy.sh +++ b/agent/structs/deep-copy.sh @@ -52,6 +52,7 @@ deep-copy \ -type ServiceRoute \ -type ServiceRouteDestination \ -type ServiceRouteMatch \ + -type ServiceSpecificRequest \ -type TCPRouteConfigEntry \ -type Upstream \ -type UpstreamConfiguration \ diff --git a/agent/structs/structs.deepcopy.go b/agent/structs/structs.deepcopy.go index 9c9a7c8bc9..7d26a76469 100644 --- a/agent/structs/structs.deepcopy.go +++ b/agent/structs/structs.deepcopy.go @@ -1,4 +1,4 @@ -// generated by deep-copy -pointer-receiver -o ./structs.deepcopy.go -type APIGatewayListener -type BoundAPIGatewayListener -type CARoot -type CheckServiceNode -type CheckType -type CompiledDiscoveryChain -type ConnectProxyConfig -type DiscoveryFailover -type DiscoveryGraphNode -type DiscoveryResolver -type DiscoveryRoute -type DiscoverySplit -type ExposeConfig -type ExportedServicesConfigEntry -type FileSystemCertificateConfigEntry -type GatewayService -type GatewayServiceTLSConfig -type HTTPHeaderModifiers -type HTTPRouteConfigEntry -type HashPolicy -type HealthCheck -type IndexedCARoots -type IngressListener -type InlineCertificateConfigEntry -type Intention -type IntentionPermission -type LoadBalancer -type MeshConfigEntry -type MeshDirectionalTLSConfig -type MeshTLSConfig -type Node -type NodeService -type PeeringServiceMeta -type ServiceConfigEntry -type ServiceConfigResponse -type ServiceConnect -type ServiceDefinition -type ServiceResolverConfigEntry -type ServiceResolverFailover -type ServiceRoute -type ServiceRouteDestination -type ServiceRouteMatch -type TCPRouteConfigEntry -type Upstream -type UpstreamConfiguration -type Status -type BoundAPIGatewayConfigEntry ./; DO NOT EDIT. +// generated by deep-copy -pointer-receiver -o ./structs.deepcopy.go -type APIGatewayListener -type BoundAPIGatewayListener -type CARoot -type CheckServiceNode -type CheckType -type CompiledDiscoveryChain -type ConnectProxyConfig -type DiscoveryFailover -type DiscoveryGraphNode -type DiscoveryResolver -type DiscoveryRoute -type DiscoverySplit -type ExposeConfig -type ExportedServicesConfigEntry -type FileSystemCertificateConfigEntry -type GatewayService -type GatewayServiceTLSConfig -type HTTPHeaderModifiers -type HTTPRouteConfigEntry -type HashPolicy -type HealthCheck -type IndexedCARoots -type IngressListener -type InlineCertificateConfigEntry -type Intention -type IntentionPermission -type LoadBalancer -type MeshConfigEntry -type MeshDirectionalTLSConfig -type MeshTLSConfig -type Node -type NodeService -type PeeringServiceMeta -type ServiceConfigEntry -type ServiceConfigResponse -type ServiceConnect -type ServiceDefinition -type ServiceResolverConfigEntry -type ServiceResolverFailover -type ServiceRoute -type ServiceRouteDestination -type ServiceRouteMatch -type ServiceSpecificRequest -type TCPRouteConfigEntry -type Upstream -type UpstreamConfiguration -type Status -type BoundAPIGatewayConfigEntry ./; DO NOT EDIT. package structs @@ -1197,6 +1197,22 @@ func (o *ServiceRouteMatch) DeepCopy() *ServiceRouteMatch { return &cp } +// DeepCopy generates a deep copy of *ServiceSpecificRequest +func (o *ServiceSpecificRequest) DeepCopy() *ServiceSpecificRequest { + var cp ServiceSpecificRequest = *o + if o.NodeMetaFilters != nil { + cp.NodeMetaFilters = make(map[string]string, len(o.NodeMetaFilters)) + for k2, v2 := range o.NodeMetaFilters { + cp.NodeMetaFilters[k2] = v2 + } + } + if o.ServiceTags != nil { + cp.ServiceTags = make([]string, len(o.ServiceTags)) + copy(cp.ServiceTags, o.ServiceTags) + } + return &cp +} + // DeepCopy generates a deep copy of *TCPRouteConfigEntry func (o *TCPRouteConfigEntry) DeepCopy() *TCPRouteConfigEntry { var cp TCPRouteConfigEntry = *o diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 1a90d863ca..cae14af52b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -753,6 +753,9 @@ type ServiceSpecificRequest struct { // The name of the peer that the requested service was imported from. PeerName string + // The name of the sameness group that should be the target of the query. + SamenessGroup string + NodeMetaFilters map[string]string ServiceName string ServiceKind ServiceKind @@ -821,6 +824,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo { r.Filter, r.EnterpriseMeta, r.PeerName, + r.SamenessGroup, r.Ingress, r.ServiceKind, r.MergeCentralConfig, diff --git a/api/api.go b/api/api.go index 5c23ae8b76..b90a45d92b 100644 --- a/api/api.go +++ b/api/api.go @@ -117,6 +117,13 @@ type QueryOptions struct { // Note: Partitions are available only in Consul Enterprise Partition string + // SamenessGroup is used find the SamenessGroup in the given + // Partition and will find the failover order for the Service + // from the SamenessGroup Members, with the given Partition being + // the first member. + // Note: SamenessGroups are available only in Consul Enterprise + SamenessGroup string + // Providing a datacenter overwrites the DC provided // by the Config Datacenter string @@ -847,6 +854,12 @@ func (r *request) setQueryOptions(q *QueryOptions) { // rather than the alternative short-hand "ap" r.params.Set("partition", q.Partition) } + if q.SamenessGroup != "" { + // For backwards-compatibility with existing tests, + // use the long-hand query param name "sameness-group" + // rather than the alternative short-hand "sg" + r.params.Set("sameness-group", q.SamenessGroup) + } if q.Datacenter != "" { // For backwards-compatibility with existing tests, // use the short-hand query param name "dc"